public abstract class MessageQueueConcurrentHandlerBase<T> : IMessageQueueHandler { public MessageQueueConcurrentHandlerBase(string queueName, int maxConcurrency) {
if (!MessageQueue.Exists(queueName))
throw new Exception(string.Format("No such a queue: {0}", queueName));
if (maxConcurrency < 1)
throw new ArgumentOutOfRangeException("maxConcurrency");
this._queueName = queueName; this._pool = new Semaphore(0, maxConcurrency); } public void StartRead() { this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) }; this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce); this._queue.BeginPeek(); } public override string ToString() { return string.Format("{0}_{1}", this._queueName, this.ProcessName); } public int WorkerCount { get { return Thread.VolatileRead(ref this._workerCount); } } protected abstract string ProcessName { get; } protected abstract void MainProcess(T backThreadId); protected void LogInfo(string msg) { EntLibLogger.WriteLogFile(msg); } #region private private void Produce(object sender, PeekCompletedEventArgs e) { while (true) { this._pool.WaitOne(); var message = this._queue.EndPeek(e.AsyncResult); T backThreadId = (T)message.Body; ThreadPool.QueueUserWorkItem(new WaitCallback(Consume), backThreadId); this._queue.Receive(); this._queue.BeginPeek(); this._pool.Release(); } } private void Consume(object stateInfo) { T messageItem = (T)stateInfo; this.LogInfo(string.Format("{0} - Received a message, MessageItem = {1}", this.ProcessName, messageItem)); Interlocked.Increment(ref this._workerCount); try { this.LogInfo(string.Format("{0} - Running - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this.WorkerCount)); MainProcess(messageItem); } catch (Exception ex) { this.HandleException(ex, messageItem); } finally { Interlocked.Decrement(ref this._workerCount); this.LogInfo(string.Format("{0} - Over - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this.WorkerCount)); } } private void HandleException(Exception ex, T messageItem) { this.LogInfo(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[_workerCount]={4},[backThreadId]={5}", this.ProcessName, ex.Message, ex.StackTrace, ex.GetType(), this.WorkerCount, messageItem)); } private readonly string _queueName; private MessageQueue _queue; private int _workerCount; private Semaphore _pool; #endregion }
时间: 2024-10-28 17:01:58