C#实现异步消息队列

拿到新书《.net框架设计》,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使用我们的知识,从这本书中提炼了一篇,正好符合我前几篇的“数据驱动框架”设计的问题;

消息队列

消息队列英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件贮列用来处理一系列的输入,通常是来自使用者。消息队列提供了异步通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

简单的说队列就是贮存了我们需要处理的Command但是并不是及时的拿到其处理结果;

实现

实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。

目前,有很多消息队列有很多开源的实现,包括JBoss MessagingJORAMApache ActiveMQSun Open Message QueueApache Qpid和HTTPSQS。

优点,缺点

消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。

信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。

读取队列消息

主要有两种(1)服务端的推;(2)客户端的拉;

拉:主要是客户端定时轮询拿走消息处理;

推:通过事件订阅方式主动通知订阅者进行处理;

消息的贮存

简单的是通过内存链表实现贮存;也可以借助DB,比如Redis;还可以持久到本地文件中;

如何保证异步处理的一致性

尽管队列主要目的是实现消息贮存,同时将调用与实现异步化。但是如果想达到处理消息一致性,好的方式是区别业务处理顺序,比如操作主从DB,主负责写,从负责读,我们没有机会在写之后立马从读数据库拿到你想要的结果;同时我们需要借助中间状态,当多个中间状态同时符合调用结果才到到业务时间被处理,否则将“异常消息”持久化,待下次操作;

上代码

建立消息对立核心队列

{
    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);

    public class MessageQueue:Queue<BaseMessage>
    {
        public static MessageQueue GlobalQueue = new MessageQueue();

        private Timer timer = new Timer();
        public MessageQueue() {
            this.timer.Interval = 5000;
            this.timer.Elapsed += Notify;
            this.timer.Enabled = true;
        }
        private void Notify(object sender, ElapsedEventArgs e) {
            lock (this) {
                if (this.Count > 0) {
                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                    var message = this.Dequeue();
                    this.messageNotifyEvent(message);
                }
            }
        }

        private MessageQueueEventNotifyHandler messageNotifyEvent;
        public event MessageQueueEventNotifyHandler MessageNotifyEvent {
            add {
                this.messageNotifyEvent += value;
            }

            remove {
                if (this.messageNotifyEvent != null) {
                    this.messageNotifyEvent -= value;
                }
            }
        }
    }
}

事件处理

public const string OrderCodePrefix = "P";
        public void Submit(Message.BaseMessage message)
        {
            Order order = message.Body as Order;

            if (order.OrderCode.StartsWith(OrderCodePrefix))
            {
                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);
            }
            else {
                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);
            }
        }

可依据具体业务进行个性化处理;

通过Proxy向队列追加消息

public class OrderServiceProxy:IOrderService
    {
        public void Submit(Message.BaseMessage message)
        {
            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
        }
    }

客户端调用

OrderService orderService = new OrderService();
            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;

            var orders = new List<Order>() {
                new Order(){OrderCode="P001"},
                new Order(){OrderCode="P002"},
                new Order(){OrderCode="B003"}
            };

            OrderServiceProxy proxy = new OrderServiceProxy();
            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));

            Console.ReadLine();

这样就满足了事件的绑定与触发个性化处理,同时达到了消息异步化的目的,希望更细致的拓展用到后期的项目中。

时间: 2024-12-29 12:47:36

C#实现异步消息队列的相关文章

微信后台异步消息队列的优化升级实践分享

1.引言 MQ 异步消息队列是微信后台自研的重要组件,广泛应用在各种业务场景中,为业务提供解耦.缓冲.异步化等能力.本文分享了该组件2.0版本的功能特点及优化实践,希望能为类似业务(比如移动端IM系统等)的消息队列设计提供一定的参考. 2.关于分享者 廖文鑫,2013年加入腾讯,从事微信后台基础功能及架构的开发和运营,先后参与了消息通知推送系统.任务队列组件.春晚摇红包活动等项目,在海量分布式高性能系统方面有丰富的经验. 3.背景介绍 微信后台给件 MQ 1.0 发布之初,基本满足了一般业务场景

使用Redis实现异步消息队列

工作中涉及到redis异步消费,查阅资料,记录下~ 使用Redis实现异步消息队列 https://blog.csdn.net/b540969928/article/details/78406791 异步消息队列 https://blog.csdn.net/qq_33589510/article/details/77126852 原文地址:https://www.cnblogs.com/meixiaoqiu/p/10238227.html

C#后台异步消息队列实现

简介 基于生产者消费者模式,我们可以开发出线程安全的异步消息队列. 知识储备 什么是生产者消费者模式? 为了方便理解,我们暂时将它理解为垃圾的产生到结束的过程. 简单来说,多住户产生垃圾(生产者)将垃圾投递到全小区唯一一个垃圾桶(单队列),环卫将垃圾桶中的垃圾进行处理(消费者).就是一个生产者消费者模式. 这种模式的好处,就不在这里叙述了,毕竟这篇文章不是在讲设计模式.有兴趣的小伙伴可以自行了解一下. 应用场景 很多时候,我们有一些不紧急但却对操作顺序有强依赖的需求. 比如,12306候补抢票.

消息队列库——ZeroMQ

消息队列库--ZeroMQ ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型.连接处理.帧.甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字. ZMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间. ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信.消息队列.线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信. 主线程与I/O线程: I

关于windows操作系统之消息和消息队列

关于消息和消息队列 不像基于MS-DOS的应用程序,基于Windows的程序是事件驱动的.他们不做任何显示调用来获取输入.而是通过等待系统传递给他们. 系统为应用程序传递所有输入到程序中的不同窗口.每个窗口都有一个称为窗口过程的函数,用于处理所有到该窗口的输入.窗口处理过程处理输入,并将控制返回给系统. 如果一个顶层窗口停止响应消息超过两秒,系统将会认为该窗口为非响应状态.在这种情况下,系统将隐藏该窗口并用拥有同样Z顺序,位置,尺寸和可视化属性的ghost窗口替代该窗口.这种情况下,允许用户移动

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

消息队列及常见消息队列介绍

原文链接:https://cloud.tencent.com/community/article/129032 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaM

消息队列概念与认知

本文是-消息队列学习的概念与介绍篇.目的是能够对消息队列能够有一个简单的了解和大体的认知. 参考/学习资料整理(好东西要学会分享 ) B站上的黑马ActiveMQ的视频教程 Hollis公众号上的消息队列文章 架构之家公众号上的消息队列文章 JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目) CS-Notes(技术面试必备基础知识) JCSprout(处于萌芽阶段的 Java 核心知识库) 一个在线绘图的工具 一.消息队列简介 消息队列 MQ(message qu

关于消息队列的技术选型

https://cloud.tencent.com/developer/article/1006035 导语 : 消息队列是分布式系统中重要的组件,在很多生产环境如商品抢购等需要控制并发量的场景下都需要用到.最近组内需要做流水server的选型升级,这里对消息队列及常见的消息队列进行了一次调研,整理了相关资料,分享给大家. 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制