消息队列帮助类

准备工作:

1:安装windows组件(MSMQ)

编写代码:

using System;
using System.Messaging;
using System.Collections.Generic;
using System.Text;

namespace LCL.Bus
{
    public interface IBusMessageQueue
    {
        void Clear();
        List<BusMessage> GetAll();
        BusMessage Receive();
        BusMessage ReceiveById(string id);
        void Publish(IEnumerable<BusMessage> messages);
        void Publish(BusMessage message);
    }
    public class BusMessageQueue : System.IDisposable, IBusMessageQueue
    {
        #region Private Fields
        private readonly Guid id = Guid.NewGuid();
        private readonly MessageQueue messageQueue;
        private readonly object lockObj = new object();
        private readonly BinaryMessageFormatter formatter = new BinaryMessageFormatter();
        #endregion
        public BusMessageQueue()
        {
            try
            {
                CreateMessageQueue();
                string mqName = CreateMessageQueueName(EdoorBusMQ_MessageQueue);
                this.messageQueue = new MessageQueue(mqName);
                this.messageQueue.Label = mqName;
                //Administrators
                this.messageQueue.SetPermissions("Everyone", System.Messaging.MessageQueueAccessRights.FullControl);
                this.messageQueue.Formatter = formatter;
                this.messageQueue.UseJournalQueue = true;
            }
            catch (Exception ex)
            {
                //FileLogger.LogError("BusMessageQueue:", ex);
            }
        }
        public BusMessageQueue(string path)
        {
            try
            {
                if (!string.IsNullOrEmpty(path))
                    EdoorBusMQ_MessageQueue = path;
                CreateMessageQueue();
                string mqName = CreateMessageQueueName(EdoorBusMQ_MessageQueue);
                this.messageQueue = new MessageQueue(mqName);
                this.messageQueue.Label = mqName;
                //Administrators
                this.messageQueue.SetPermissions("Everyone", System.Messaging.MessageQueueAccessRights.FullControl);
                this.messageQueue.Formatter = formatter;
                this.messageQueue.UseJournalQueue = true;
            }
            catch (Exception ex)
            {
                //FileLogger.LogError("BusMessageQueue:", ex);
            }
        }
        private void SendMessage(BusMessage message, MessageQueueTransaction transaction = null)
        {
            try
            {
                Message msmqMessage = new Message();
                msmqMessage.Label = message.Label;
                msmqMessage.Body = message;
                msmqMessage.Formatter = formatter;
                messageQueue.Send(msmqMessage);
            }
            catch (Exception ex)
            {  //FileLogger.LogError("SendMessage:", ex);
            }

        }
        #region IBus Members
        public void Publish(BusMessage message)
        {
            lock (lockObj)
            {
                SendMessage(message);
            }
        }
        public void Publish(IEnumerable<BusMessage> messages)
        {
            lock (lockObj)
            {
                foreach (var item in messages)
                {
                    SendMessage(item);
                }
            }
        }
        public List<BusMessage> GetAll()
        {
            List<BusMessage> list = new List<BusMessage>();
            var message = GetAllMessages();
            foreach (var item in message)
            {
                var msg = formatter.Read(item);
                var model = msg as BusMessage;
                model.ID = item.Id;
                list.Add(model);
            }
            return list;
        }
        public BusMessage Receive()
        {
            var msg = messageQueue.Receive();
            var item = formatter.Read(msg);
            return item as BusMessage;
        }
        public BusMessage ReceiveById(string id)
        {
            var msg = messageQueue.ReceiveById(id);
            var item = formatter.Read(msg);
            return item as BusMessage;
        }
        public void Clear()
        {
            lock (lockObj)
            {
                ClearMessageQueue();
            }
        }
        #endregion
        #region Hepler
        private void CreateMessageQueue()
        {
            string mqName = CreateMessageQueueName(EdoorBusMQ_MessageQueue);
            if (!MessageQueue.Exists(mqName))
                MessageQueue.Create(mqName);
        }
        private void ClearMessageQueue()
        {
            string mqName = CreateMessageQueueName(EdoorBusMQ_MessageQueue);
            if (!MessageQueue.Exists(mqName))
                MessageQueue.Create(mqName);
            else
            {
                using (MessageQueue mq = new MessageQueue(mqName))
                {
                    mq.Purge();
                    mq.Close();
                }
            }
        }
        private string CreateMessageQueueName(string mq)
        {
            return string.Format(@".\private$\{0}", mq);
        }
        private string EdoorBusMQ_MessageQueue = @"LCLBusMQ";
        private Message[] GetAllMessages()
        {
            string mqName = CreateMessageQueueName(EdoorBusMQ_MessageQueue);
            if (!MessageQueue.Exists(mqName))
            {
                MessageQueue.Create(mqName, true);
                return null;
            }
            else
            {
                Message[] ret = null;
                using (MessageQueue mq = new MessageQueue(mqName))
                {
                    ret = mq.GetAllMessages();
                    mq.Close();
                }
                return ret;
            }
        }
        public int GetMessageQueueCount()
        {
            string mqName = CreateMessageQueueName(EdoorBusMQ_MessageQueue);
            if (!MessageQueue.Exists(mqName))
            {
                MessageQueue.Create(mqName, true);
                return 0;
            }
            else
            {
                int ret = 0;
                using (MessageQueue mq = new MessageQueue(mqName))
                {
                    ret = mq.GetAllMessages().Length;
                    mq.Close();
                }
                return ret;
            }
        }
        #endregion
        public void Dispose()
        {
            if (messageQueue != null)
            {
                messageQueue.Close();
                messageQueue.Dispose();
            }
        }
    }
    [Serializable]
    public class BusMessage
    {
        public BusMessage()
        {
            ID = Guid.NewGuid().ToString();
        }
        public string ID { get; set; }
        public string Label { get; set; }
        /// <summary>
        ///  1:pdfToimg
        /// </summary>
        public int Type { get; set; }
        public object Body { get; set; }
    }
}

调用代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;

namespace LCL.Bus
{
    class Program
    {
        static void Main(string[] args)
        {
            var msgq = new BusMessageQueue();
            msgq.Clear();
            BusMessage msg = new BusMessage();
            msg.Label = "文档转图片";
            msg.Type = 1;
            msg.Body = @"D:\201508\39443.pdf";
            msgq.Publish(msg);

            while (true)
            {
                var mesg = msgq.Receive();
                switch (mesg.Type)
                {
                    case 1:
                        //业务逻辑实现
                        break;
                    default:
                        break;
                }
                Console.WriteLine(mesg.Body);
            }

            Console.ReadKey();
        }
    }
}

获取代码:

http://download.csdn.net/detail/luomingui/9170879

时间: 2024-12-16 05:49:41

消息队列帮助类的相关文章

RabbitMqHelper 消息队列帮助类

using Newtonsoft.Json;using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks; namespace RabbitMQ_Send{ class ConfigModel { } public enum ExchangeTypeEnum { /// <summary> /

第四讲:消息队列处理类

#ifndef __CommonMsgHandler___ #define __CommonMsgHandler___ #include "cocos2d.h" #include <thread> #include <queue> #include <iostream> #include <google/protobuf/message.h> #include <sigslot.h> #include "receiv

MSMQ消息队列

一.引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版本管理的序列化功能,并提供了与消息队列(MSMQ).COM+.Asp.net Web服务..NET Remoting等微软现有的分布式系统技术.利用WCF平台,开发人员可以很方便地构建面向服务的应用程序(SOA).可以认为,WCF是对之前现有的分布式技术(指的是MSMQ..NET Remoting和Web 服务等技术

跟我一起学WCF(1)——MSMQ消息队列

一.引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版本管理的序列化功能,并提供了与消息队列(MSMQ).COM+.Asp.net Web服务..NET Remoting等微软现有的分布式系统技术.利用WCF平台,开发人员可以很方便地构建面向服务的应用程序(SOA).可以认为,WCF是对之前现有的分布式技术(指的是MSMQ..NET Remoting和Web 服务等技术

.Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信

RabbitMQ是什么,怎么使用我就不介绍了,大家可以到园子里搜一下教程.本篇的重点在于实现服务与服务之间的异步通信. 首先说一下为什么要使用消息队列来实现服务通信:1.提高接口并发能力.  2.保证服务各方数据最终一致.  3.解耦. 使用消息队列通信的有点就是直接调用的缺点,比如在直接调用过程中发生未知错误,很可能就会出现数据不一致的问题,这个时候就需要人工修补数据,如果有过这个经历的同学一定是可怜的,人工修补数据简直痛苦!!再比如高并发情况下接口直接挂点,这就更直白了,接口挂了,功能就挂了

【转】MSMQ 微软消息队列 简单 示例

MSMQ它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中:本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理. 我个人的理解,你可以把他当做一种,把数据打包后,发送到一个地方,程序也可以去取到这个打包的程序,队列的机制就不讲了,并发问题荡然无存.呵呵. 上代码: 首先 using System.Messaging; public class MsmqManagerHe

XSI进程间通信-----消息队列

1. 基本特点 1) 消息队列是一个由系统内核负责存储和管理,并通过消息队列标识引用的数据链表,消息队列 和有名管道fifo的区别在: 后者一次只能放一个包,而前者则可以放很多包,这样就能处理发包快,哪包慢的问题 2) 可以通过msgget函数创建一个新的消息队列, 或获取一个已有的消息队列. 通过msgsnd函数 (send)向消息队列的后端追加消息, 通过msgrcv(receive)函数从消息队列的前端提取消息. 3) 消息队列中的每个消息单元除包含消息数据外,还包含消息类型和数据长度.消

消息队列实现订单异步提交

what MSMQ(Microsoft Message Queue),微软消息队列,用于应用程序之间相互通信的一种异步传输模式.应用程序可以分布在同台机器上,也可以分布于互联的网络中的任意位置.基本原理:消息发送者把要发送的消息放入容器,也就是Message(消息),然后保存到系统公用空间的消息队列中(Message Queue)中,本地或互联位置上的消息接收程序再从队列中取出发给它的消息进行处理.消息类型可以是文本,图像,自定义对象等.消息队列分为公共队列和私有队列. why 一.用于进程间的

Freertos-事件标志组,消息队列,信号量,二值信号量,互斥信号量

任务间的通信和同步机制  在裸机编程时,使用全局变量的确比较方便,但是在加上 RTOS 后就是另一种情况了. 使用全局变量相比事件标志组主要有如下三个问题: 1.使用事件标志组可以让 RTOS 内核有效地管理任务,而全局变量是无法做到的,任务的超时等机制需要用户自己去实现.2.使用了全局变量就要防止多任务的访问冲突,而使用事件标志组则处理好了这个问题,用户无需担心.3.使用事件标志组可以有效地解决中断服务程序和任务之间的同步问题. 事件标志组:事件标志组是实现多任务同步的有效机制之一. 每创建一