消息队列处理基类-简化版

实现不考虑限制并发数的情况下对某队列的并发处理,欢迎批评指正:

    public interface IMessageQueueHandler
    {
        void StartRead();        int WorkerCount { get; }
    }

    public abstract class SimplifiedMessageQueueHandlerBase<T> : IMessageQueueHandler
    {
        public SimplifiedMessageQueueHandlerBase(string queueName)
        {
            if (!MessageQueue.Exists(queueName))
                throw new Exception();

            this._queueName = queueName;
        }

        public void StartRead()
        {
            this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) };
            this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce);
            this._queue.BeginPeek();
        }

        public override string ToString()
        {
            return string.Format("{0}_{1}", this._queueName, this.ProcessName);
        }

        public int WorkerCount { get { return Thread.VolatileRead(ref this._workerCount); } }

        protected abstract string ProcessName { get; }

        protected abstract void MainProcess(T backThreadId);

        protected void LogInfo(string msg)
        {
            EntLibLogger.WriteLogFile(msg);
        }

        #region private
        private void Produce(object sender, PeekCompletedEventArgs e)
        {
            var message = this._queue.EndPeek(e.AsyncResult);
            T backThreadId = (T)message.Body;

            ThreadPool.QueueUserWorkItem(new WaitCallback(Consume), backThreadId);

            this._queue.Receive();
            this._queue.BeginPeek();
        }

        private void Consume(object stateInfo)
        {
            T messageItem = (T)stateInfo;
            this.LogInfo(string.Format("{0} - Received a message, MessageItem = {1}", this.ProcessName, messageItem));
            Interlocked.Increment(ref this._workerCount);

            try
            {
                this.LogInfo(string.Format("{0} - Running - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this.WorkerCount));
                MainProcess(messageItem);
            }
            catch (Exception ex)
            {
                this.HandleException(ex, messageItem);
            }
            finally
            {
                Interlocked.Decrement(ref this._workerCount);

                this.LogInfo(string.Format("{0} - Over - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this.WorkerCount));
            }
        }

        private void HandleException(Exception ex, T messageItem)
        {
            this.LogInfo(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[_workerCount]={4},[backThreadId]={5}", this.ProcessName, ex.Message, ex.StackTrace, ex.GetType(), this.WorkerCount, messageItem));
        }

        private readonly string _queueName;
        private MessageQueue _queue;
        private int _workerCount;
        #endregion
    }

觉得写的好的表扬一下啊:)

时间: 2024-10-10 21:39:55

消息队列处理基类-简化版的相关文章

RabbitMqHelper 消息队列帮助类

using Newtonsoft.Json;using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks; namespace RabbitMQ_Send{ class ConfigModel { } public enum ExchangeTypeEnum { /// <summary> /

微信公众号发送消息之发送客服消息基类封装

当用户主动发消息给公众号的时候(包括发送信息.点击自定义菜单.订阅事件.扫描二维码事件.支付成功事件.用户维权),微信将会把消息数据推送给开发者,开发者在一段时间内(目前修改为48小时)可以调用客服消息接口,通过POST一个JSON数据包来发送消息给普通用户,在48小时内不限制发送次数.此接口主要用于客服等有人工消息处理环节的功能,方便开发者为用户提供更加优质的服务. http请求方式: POST https://api.weixin.qq.com/cgi-bin/message/custom/

【翻译】DotNetMQ: 一个.NET版完整的消息队列系统

在一个大型的分布式系统中,消息队列是不可缺少的中间件,能很好的解决异步消息.应用解耦.均衡并发等问题.在.net中,偶然发现一个效率不错.安全可靠.功能齐全的消息组件,忍不住翻译过来,供大家快速预览. 注:原作者用windows服务启动消息队列服务,但是本人在win10上测试出错,可自行改成控制台启动消息队列服务,然后用第三方工具注册服务(如:SrvanyUI) 原文:http://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-M

Python 接口:从协议到抽象基类

抽象基类的常见用途:实现接口时作为超类使用.然后,说明抽象基类如何检查具体子类是否符合接口定义,以及如何使用注册机制声明一个类实现了某个接口,而不进行子类化操作.最后,说明如何让抽象基类自动“识别”任何符合接口的类——不进行子类化或注册. Python文化中的接口和协议 接口在动态类型语言中是怎么运作的呢?首先,基本的事实是,Python语言没有 interface 关键字,而且除了抽象基类,每个类都有接口:类实现或继承的公开属性(方法或数据属性),包括特殊方法,如__getitem__ 或 _

MSMQ消息队列 用法

引言 接下来的三篇文章是讨论有关企业分布式开发的文章,这三篇文章筹划了很长时间,文章的技术并不算新,但是文章中使用到的技术都是经过笔者研究实践后总结的,正所谓站在巨人的肩膀上,笔者并不是巨人,但也希望这几篇文章能够帮助初涉企业分布式开发的一些童鞋.        三篇文章将会从MessageQueue.Windows Services和WCF着手来讨论企业分布式的开发,MQ是一种消息中间件技术,该篇文章将会详细讨论.Windows Services在分布式开发中同样起着重要的作用,将会在下篇文章

自己动手实现消息队列之JMS

什么是JMS?JMS的诞生史? 在JMS还没有诞生前,每个企业都会有自己的一套内部消息系统,比如项目组A需要调用到项目组B的系统,项目组B也有可能会调用到项目组C的系统.这样每个公司都有自己的一套实现.很不规范,所以Apache基金会,为企业消息产品专门定义了一套规范.我们可以把JMS当作是一系列接口及相关语义的集合,通过这些接口和语义定义了JSM客户端如何去访问消息系统.简单点来说就是JMS主要干了两件事,定义通用的消息格式,和消息传递的模式. 体系结构 JMS由以下元素组成.[1] JMS提

使用java实现阿里云消息队列简单封装

一.前言 最近公司有使用阿里云消息队列的需求,为了更加方便使用,本人用了几天时间将消息队列封装成api调用方式以方便内部系统的调用,现在已经完成,特此记录其中过程和使用到的相关技术,与君共勉. 现在阿里云提供了两种消息服务:mns服务和ons服务,其中我认为mns是简化版的ons,而且mns的消息消费需要自定义轮询策略的,相比之下,ons的发布与订阅模式功能更加强大(比如相对于mns,ons提供了消息追踪.日志.监控等功能),其api使用起来更加方便,而且听闻阿里内部以后不再对mns进行新的开发

【转】MSMQ 微软消息队列 简单 示例

MSMQ它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中:本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理. 我个人的理解,你可以把他当做一种,把数据打包后,发送到一个地方,程序也可以去取到这个打包的程序,队列的机制就不讲了,并发问题荡然无存.呵呵. 上代码: 首先 using System.Messaging; public class MsmqManagerHe

XSI进程间通信-----消息队列

1. 基本特点 1) 消息队列是一个由系统内核负责存储和管理,并通过消息队列标识引用的数据链表,消息队列 和有名管道fifo的区别在: 后者一次只能放一个包,而前者则可以放很多包,这样就能处理发包快,哪包慢的问题 2) 可以通过msgget函数创建一个新的消息队列, 或获取一个已有的消息队列. 通过msgsnd函数 (send)向消息队列的后端追加消息, 通过msgrcv(receive)函数从消息队列的前端提取消息. 3) 消息队列中的每个消息单元除包含消息数据外,还包含消息类型和数据长度.消