现在的位置: 首页 > 综合 > 正文

ConcurrentAsyncQueue 2012-02-23

2012年01月14日 ⁄ 综合 ⁄ 共 14595字 ⁄ 字号 评论关闭


//#define c4 //C# 4.0+
#define c4
namespace Microshaoft
{
using System;
using System.Threading;
using System.Diagnostics;
using System.Collections.Generic;
#if c4
using System.Collections.Concurrent;
#endif
using Microshaoft;
public class ConcurrentAsyncQueue<T>
#if c2
where T : class
#endif
{
public delegate void QueueEventHandler(T element);
public event QueueEventHandler OnDequeue;
public delegate void QueueLogEventHandler(string logMessage);
//public event QueueLogEventHandler OnQueueLog;
public event QueueLogEventHandler OnQueueRunningThreadStart;
public event QueueLogEventHandler OnQueueRunningThreadEnd;
public event QueueLogEventHandler OnDequeueThreadStart;
public event QueueLogEventHandler OnDequeueThreadEnd;
public event QueueLogEventHandler OnDequeueAllThreadsEnd;
public delegate void ExceptionEventHandler(Exception exception);
public event ExceptionEventHandler OnException;
#if c2
private Queue<T> _queue = new Queue<T>();
private object _syncQueueLockObject = new object();
#elif c4
private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
#endif
//private object _syncQueueRunningLockObject = new object();
private long _isQueueRunning = 0;
private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数
private PerformanceCounter _enqueuePerformanceCounter;
private PerformanceCounter _dequeuePerformanceCounter;
private PerformanceCounter _dequeueProcessedPerformanceCounter;
private PerformanceCounter _queueLengthPerformanceCounter;
private PerformanceCounter _dequeueThreadStartPerformanceCounter;
private PerformanceCounter _dequeueThreadEndPerformanceCounter;
private PerformanceCounter _dequeueThreadsCountPerformanceCounter;
private PerformanceCounter _queueRunningThreadStartPerformanceCounter;
private PerformanceCounter _queueRunningThreadEndPerformanceCounter;
private PerformanceCounter _queueRunningThreadsCountPerformanceCounter;
private bool _isAttachedPerformanceCounters = false;
public void AttachPerformanceCounters(string instanceNamePrefix)
{
string category = "Microshaoft AsyncConurrentQueue Counters";
string counter = string.Empty;
Process process = Process.GetCurrentProcess();
//int processID = 0;//process.Id;
string processName = process.ProcessName;
//string processStartTime = "";//process.StartTime;
string instanceName = string.Empty;
instanceName = string.Format
(
"{0}-{1}"
, instanceNamePrefix
, processName
//, processID
//, processStartTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
);
CounterCreationDataCollection ccdc = new CounterCreationDataCollection();
if (PerformanceCounterCategory.Exists(category))
{
PerformanceCounterCategory.Delete(category);
}
CounterCreationData ccd = null;
counter = "EnqueueCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "DequeueCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "QueueLengthCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "DequeueProcessedCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "DequeueThreadStartCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "DequeueThreadEndCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "DequeueThreadsCountCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "QueueRunningThreadStartCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "QueueRunningThreadEndCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
counter = "QueueRunningThreadsCountCounter";
ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
PerformanceCounterCategory.Create
(
category,
string.Format("{0} Category Help.", category),
PerformanceCounterCategoryType.MultiInstance,
ccdc
);
counter = "EnqueueCounter";
_enqueuePerformanceCounter = new PerformanceCounter();
_enqueuePerformanceCounter.CategoryName = category;
_enqueuePerformanceCounter.CounterName = counter;
_enqueuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_enqueuePerformanceCounter.InstanceName = instanceName;
_enqueuePerformanceCounter.ReadOnly = false;
_enqueuePerformanceCounter.RawValue = 0;
counter = "DequeueCounter";
_dequeuePerformanceCounter = new PerformanceCounter();
_dequeuePerformanceCounter.CategoryName = category;
_dequeuePerformanceCounter.CounterName = counter;
_dequeuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_dequeuePerformanceCounter.InstanceName = instanceName;
_dequeuePerformanceCounter.ReadOnly = false;
_dequeuePerformanceCounter.RawValue = 0;
counter = "DequeueProcessedCounter";
_dequeueProcessedPerformanceCounter = new PerformanceCounter();
_dequeueProcessedPerformanceCounter.CategoryName = category;
_dequeueProcessedPerformanceCounter.CounterName = counter;
_dequeueProcessedPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_dequeueProcessedPerformanceCounter.InstanceName = instanceName;
_dequeueProcessedPerformanceCounter.ReadOnly = false;
_dequeueProcessedPerformanceCounter.RawValue = 0;
counter = "QueueLengthCounter";
_queueLengthPerformanceCounter = new PerformanceCounter();
_queueLengthPerformanceCounter.CategoryName = category;
_queueLengthPerformanceCounter.CounterName = counter;
_queueLengthPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_queueLengthPerformanceCounter.InstanceName = instanceName;
_queueLengthPerformanceCounter.ReadOnly = false;
_queueLengthPerformanceCounter.RawValue = 0;
counter = "DequeueThreadStartCounter";
_dequeueThreadStartPerformanceCounter = new PerformanceCounter();
_dequeueThreadStartPerformanceCounter.CategoryName = category;
_dequeueThreadStartPerformanceCounter.CounterName = counter;
_dequeueThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_dequeueThreadStartPerformanceCounter.InstanceName = instanceName;
_dequeueThreadStartPerformanceCounter.ReadOnly = false;
_dequeueThreadStartPerformanceCounter.RawValue = 0;
counter = "DequeueThreadEndCounter";
_dequeueThreadEndPerformanceCounter = new PerformanceCounter();
_dequeueThreadEndPerformanceCounter.CategoryName = category;
_dequeueThreadEndPerformanceCounter.CounterName = counter;
_dequeueThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_dequeueThreadEndPerformanceCounter.InstanceName = instanceName;
_dequeueThreadEndPerformanceCounter.ReadOnly = false;
_dequeueThreadEndPerformanceCounter.RawValue = 0;
counter = "DequeueThreadsCountCounter";
_dequeueThreadsCountPerformanceCounter = new PerformanceCounter();
_dequeueThreadsCountPerformanceCounter.CategoryName = category;
_dequeueThreadsCountPerformanceCounter.CounterName = counter;
_dequeueThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_dequeueThreadsCountPerformanceCounter.InstanceName = instanceName;
_dequeueThreadsCountPerformanceCounter.ReadOnly = false;
_dequeueThreadsCountPerformanceCounter.RawValue = 0;
counter = "QueueRunningThreadStartCounter";
_queueRunningThreadStartPerformanceCounter = new PerformanceCounter();
_queueRunningThreadStartPerformanceCounter.CategoryName = category;
_queueRunningThreadStartPerformanceCounter.CounterName = counter;
_queueRunningThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_queueRunningThreadStartPerformanceCounter.InstanceName = instanceName;
_queueRunningThreadStartPerformanceCounter.ReadOnly = false;
_queueRunningThreadStartPerformanceCounter.RawValue = 0;
counter = "QueueRunningThreadEndCounter";
_queueRunningThreadEndPerformanceCounter = new PerformanceCounter();
_queueRunningThreadEndPerformanceCounter.CategoryName = category;
_queueRunningThreadEndPerformanceCounter.CounterName = counter;
_queueRunningThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_queueRunningThreadEndPerformanceCounter.InstanceName = instanceName;
_queueRunningThreadEndPerformanceCounter.ReadOnly = false;
_queueRunningThreadEndPerformanceCounter.RawValue = 0;
counter = "QueueRunningThreadsCountCounter";
_queueRunningThreadsCountPerformanceCounter = new PerformanceCounter();
_queueRunningThreadsCountPerformanceCounter.CategoryName = category;
_queueRunningThreadsCountPerformanceCounter.CounterName = counter;
_queueRunningThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
_queueRunningThreadsCountPerformanceCounter.InstanceName = instanceName;
_queueRunningThreadsCountPerformanceCounter.ReadOnly = false;
_queueRunningThreadsCountPerformanceCounter.RawValue = 0;
_isAttachedPerformanceCounters = true;
}
private int _maxConcurrentDequeueThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
public int MaxConcurrentDequeueThreadsCount
{
set
{
_maxConcurrentDequeueThreadsCount = value;
}
get
{
return _maxConcurrentDequeueThreadsCount;
}
}
//Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
private void QueueRun() //Microshaoft ThreadStart
{
if (Interlocked.Read(ref _concurrentDequeueThreadsCount) < _maxConcurrentDequeueThreadsCount)
{
if (Interlocked.CompareExchange(ref _isQueueRunning, 0, 1) == 0)
{
ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
Thread t = new Thread(ts);
t.Name = "QueueRunningThreadProcess";
t.Start();
}
}
}
public int Count
{
get
{
return _queue.Count;
}
}
public long ConcurrentThreadsCount
{
get
{
return _concurrentDequeueThreadsCount;
}
}
private void QueueRunThreadProcess()
{
if (_isAttachedPerformanceCounters)
{
_queueRunningThreadStartPerformanceCounter.Increment();
_queueRunningThreadsCountPerformanceCounter.Increment();
}
if (OnQueueRunningThreadStart != null)
{
OnQueueRunningThreadStart
(
string.Format
(
"{0} Threads Count {1},Queue Count {2},Current Thread: {3}({4}) at {5}"
, "Queue Running Start ..."
, _concurrentDequeueThreadsCount
, _queue.Count
, Thread.CurrentThread.Name
, Thread.CurrentThread.ManagedThreadId
, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
)
);
}
#if c2
while ((_queue.Count > 0)) //Microshaoft 死循环
#elif c4
while (!_queue.IsEmpty) //Microshaoft 死循环
#endif
{
int threadID = -1;
{
int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
if (r < _maxConcurrentDequeueThreadsCount)
{
//if (_queue.Count > 0)
{
r = (int)Interlocked.Increment(ref _concurrentDequeueThreadsCount);
threadID = (int)_concurrentDequeueThreadsCount;
//ThreadProcessState tps = new ThreadProcessState();
//tps.element = element;
//tps.Sender = this;
Thread t = new Thread(new ThreadStart(DequeueThreadProcess));
t.TrySetApartmentState(ApartmentState.STA);
t.Name = string.Format("ConcurrentDequeueProcessThread[{0}]", threadID);
t.Start();
}
/// else
/// {
/// break;
/// }
}
else
{
break;
}
}
}
//Interlocked.CompareExchange(ref _queueRuning, 0, 1);
if (OnQueueRunningThreadEnd != null)
{
int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
OnQueueRunningThreadEnd
(
string.Format
(
"{0} Threads Count {1}, Queue Count {2}, Current Thread: {3}({4}) at {5}"
, "Queue Running Stop ..."
, r
, _queue.Count
, Thread.CurrentThread.Name
, Thread.CurrentThread.ManagedThreadId
, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
)
);
}
if (_isAttachedPerformanceCounters)
{
_queueRunningThreadEndPerformanceCounter.Increment();
_queueRunningThreadsCountPerformanceCounter.Decrement();
}
Interlocked.Exchange(ref _isQueueRunning, 0);
}
public void Enqueue(T element)
{
try
{
#if c2
lock (_syncQueueLockObject) //还算并发吗?
#endif
{
_queue.Enqueue(element);
}
if (_isAttachedPerformanceCounters)
{
_enqueuePerformanceCounter.Increment();
_queueLengthPerformanceCounter.Increment();
}
}
catch (Exception e)
{
if (OnException != null)
{
OnException(e);
}
}
//int r = Interlocked.CompareExchange(ref _queueRuning, 1, 0))
//if (r == 1)
//{
QueueRun();
//}
}
private void DequeueThreadProcess()
{
if (_isAttachedPerformanceCounters)
{
_dequeueThreadStartPerformanceCounter.Increment();
_dequeueThreadsCountPerformanceCounter.Increment();
}
if (OnDequeueThreadStart != null)
{
int r = (int)Interlocked.Read(ref _concurrentDequeueThreadsCount);
OnDequeueThreadStart
(
string.Format
(
"{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
, "Threads ++ !"
, r
, _queue.Count
, Thread.CurrentThread.Name
, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
)
);
}
bool queueWasNotEmpty = false;
try
{
#if c2
while (true)
#elif c4
while (!_queue.IsEmpty)
#endif
{
T element
#if c2
= null
#endif
;
#if c2
lock (_syncQueueLockObject)
{
if (_queue.Count > 0)
{
element = _queue.Dequeue();
}
else
{
//避免QueueRun 死循环
break;
}
}
#elif c4
if (_queue.TryDequeue(out element))
{
#elif c2
if (element != null)
{
#endif
if (!queueWasNotEmpty)
{
queueWasNotEmpty = true;
}
if (_isAttachedPerformanceCounters)
{
_dequeuePerformanceCounter.Increment();
_queueLengthPerformanceCounter.Decrement();
}
if (OnDequeue != null)
{
OnDequeue(element);
}
if (_isAttachedPerformanceCounters)
{
_dequeueProcessedPerformanceCounter.Increment();
}
#if c2
}
#elif c4
}
}
#endif
}
catch (Exception e)
{
if (OnException != null)
{
OnException(e);
}
}
finally
{
int r = (int)Interlocked.Decrement(ref _concurrentDequeueThreadsCount);
if (OnDequeueThreadEnd != null)
{
OnDequeueThreadEnd
(
string.Format
(
"{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
, "Threads--"
, r
, _queue.Count
, Thread.CurrentThread.Name
, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
)
);
}
if (r == 0)
{
if (OnDequeueAllThreadsEnd != null)
{
OnDequeueAllThreadsEnd
(
string.Format
(
"{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
, "All Threads End"
, r
, _queue.Count
, Thread.CurrentThread.Name
, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
)
);
}
}
if (_isAttachedPerformanceCounters)
{
_dequeueThreadEndPerformanceCounter.Increment();
_dequeueThreadsCountPerformanceCounter.Decrement();
}
if (queueWasNotEmpty)
{
QueueRun(); //死循环???
}
}
}
}
}
namespace Microshaoft
{
using System;
using System.Diagnostics;
public static class PerformanceCounterHelper
{
public static CounterCreationData GetCounterCreationData(string counterName, PerformanceCounterType performanceCounterType)
{
CounterCreationData ccd = new CounterCreationData();
ccd.CounterName = counterName;

抱歉!评论已关闭.