使用ZeroMQ(clrzmq)实现异步通信

ZeroMQ是对Socket的封装,通过组合多种类型的结点可以实现复杂的网络通信模式。而且ZeroMQ设计简单,可以有多种平台实现,对于跨平台项目是一个福音。

clrzmq是ZeroMQ的C#语言的实现。当我在使用clrzmq时,发现ZeroMQ的server端,即REP,在接收到消息后,回复消息,但是在回复消息之前不能再接收消息。用伪代码表示就是

while(true)
{
    byte[] receiveData = new byte[1024];
    receive(receiveData);

    //do some work
    byte[] responseData = new byte[1024];
    send(reponseData);
}

既然ZeroMQ的名称里含有MQ(Message Queue),就应该有队列的功能啊?在ZeroMQ的官方手册中介绍了router-dealer模式:

router可以作为路由器,起到缓存消息的作用,如果服务端空闲,会把消息通过dealer发送给服务端。

这篇文章使用C++实现了ZeroMQ消息队列。

幸运的是,clrzmq对router-dealer模式进行了封装,可以使用QueueDevice类实现相同的效果。

在我的例子中,我将router-dealer放在了服务端进程中,dealer和服务端的通信是县城通信,交互图如下:

tcp                     inproc                 inproc

connect     ________________      connect

客户端i  -------------|router -------- dealer| -----------服务端

——————---------

queueDevice

客户端代码如下:

static void Main(string[] args)
{
            string serverAddress = "tcp://localhost:5555";
            // ZMQ Context and client socket
            using (ZmqContext context = ZmqContext.Create())
            using (ZmqSocket client = context.CreateSocket(SocketType.REQ))
            {
                client.Connect(serverAddress);

                string request = "Hello";
                while(true)//for (int requestNum = 0; requestNum < 10; requestNum++)
                {
                    string again = Console.ReadLine();

                    Console.WriteLine("Sending request...");
                    client.Send(again + request, Encoding.Unicode);

                    string reply = client.Receive(Encoding.Unicode);
                    Console.WriteLine("Received reply {0}: ", reply);
                }

            }
 }

服务端代码:

   class Program
    {
        static ZmqContext context = ZmqContext.Create();
        static ManualResetEvent _deviceReady = new ManualResetEvent(false);
        //static ManualResetEvent _receiverReady = new ManualResetEvent(false);

        static void Main(string[] args)
        {
            startRouterDealer();
            // ZMQ Context, server socket
            _deviceReady.WaitOne();

            using (ZmqSocket server = context.CreateSocket(SocketType.REP))
            {
                //server.Bind("inproc://backend");
                server.Connect("inproc://backend");

                while (true)
                {

                    // Wait for next request from client
                    string message = server.Receive(Encoding.Unicode);
                    Console.WriteLine("Received request: {0}", message);

                    //ThreadPool.QueueUserWorkItem(new WaitCallback(procedeRequest), server);
                    // Do Some ‘work‘
                    Thread.Sleep(5000);

                    // Send reply back to client
                    server.Send(message, Encoding.Unicode);
                }
            }
        }

        private static void startRouterDealer()
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), null);
            //ThreadPool.QueueUserWorkItem(new WaitCallback(startRouterDealerThread), null);
        }
        private static void startQueueDeviceThread(object state)
        {
            //Thread.Sleep(2000);
            using (QueueDevice queue = new QueueDevice(context,
                "tcp://*:5555",
                "inproc://backend",
                DeviceMode.Threaded))
            {
                queue.Initialize();
                _deviceReady.Set();
                queue.Start();
                while(true)
                {
                    Thread.Sleep(1000);
                }
            }
        }}

ZeroMQ的手册中介绍说,router-dealer必须先启动,服务端再启动,因此ManualResetEvent 的作用是协调QueueDevice和服务端的启动顺序。

时间: 2024-12-08 17:15:16

使用ZeroMQ(clrzmq)实现异步通信的相关文章

PHP Log时时查看小工具

以前Log都是打印在一个文档中,然后打开文件夹,最后打开文档查看里面的内容,每次打开文件夹感觉很烦. 前些日子看到同事开发.NET的时候,用他自己的一个小工具能够时时查看到Log的内容,非常方便,所以就想移植到PHP开发中. 一.查看效果 1.打开客户端小工具mylog.exe,在地址中输入localhost,端口输入5555,点击开始链接,旁边屏幕会显示“开始监听”的字样. 2.打开log.php页面,页面很朴素,就打印了一串字符. 3.查看mylog.exe,里面已接收到hello字符串 二

rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较

Kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲.异步通信.汇集日志.系统解耦等方面.相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能. 针对Kafka性能方面进行简单分析,相关数据请参考:https://segmentfault.com/a/1190000003985468,下面介绍一下Kafka的架构和涉及到的名词: Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Parti

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel(2016-03-15 12:35)

这是这个系列的第三篇,其他的文章请点击下列目录 WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(一)概要设计 WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel 相对于RequestChannel,ReplyChannel比较复杂一些. 1 启动zmq的rep结点 首先需要重载OnOpen方法,启动zmq的r

ZeroMQ的学习和研究

ZeroMQ?(也拼写作 ?MQ. 0MQ 或 ZMQ) 是个非常轻量级的开源消息队列软件.它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序.ZeroMQ?的学习和应用也非常简单,它只有一个 C++ 编写成的单个库文件?libzmq.dll, 可以链接到任何应用程序中.如果要在?.NET?环境中使用,我们需要用到一个?C#?编写的名为 clrzmq.dll 包装库. ZeroMQ?可以在 Windows. OS X 和 Linux 等多种操作系统上运行, C. C++.?C#.

消息通信库ZeroMQ 4.0.4安装指南

消息通信库ZeroMQ 4.0.4安装指南 一.ZeroMQ介绍 ZeroMQ是一个开源的消息队列系统,按照官方的定义,它是一个消息通信库,帮助开发者设计分布式和并行的应用程序. 首先,我们需要明白,ZeroMQ不是传统的消息队列系统(比如ActiveMQ.WebSphereMQ.RabbitMQ等).ZeroMQ可以帮助我们建立自己的消息队列系统,它只是一个库.ZeroMQ可以运行于带x86处理器或ARM处理器的机器上,支持40多种编程语言. 消息队列,从技术的角度来讲,是以先进先出FIFO算

Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka

一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式:2.并行方式 a.串行

异步通信----WebSocket

什么是WebSocket? WebSocket API是下一代客户端-服务器的异步通信方法.该通信取代了单个的TCP套接字,使用ws或wss协议,可用于任意的客户端和服务器程序.WebSocket目前由W3C进行标准化.WebSocket已经受到Firefox 4.Chrome 4.Opera 10.70以及Safari 5等浏览器的支持. WebSocket API最伟大之处在于服务器和客户端可以在给定的时间范围内的任意时刻,相互推送信息.WebSocket并不限于以Ajax(或XHR)方式通

ZeroMQ接口函数之 :zmq_msg_set - 设置消息的性质

ZeroMQ 官方地址 :http://api.zeromq.org/4-1:zmq_msg_set zmq_msg_set(3)  ØMQ Manual - ØMQ/3.2.5 Name zmq_msg_set - 设置消息的性质 Synopsis int zmq_msg_set (zmq_msg_t *message, int property, int value); Description zmq_msg_set()函数会设置message参数指定的消息的属性,属性值由value参数指定

ZeroMQ接口函数之 :zmq_curve_keypair - 生成一个新的CURVE 密钥对

ZeroMQ 官方地址 :http://api.zeromq.org/4-0:zmq_curve_keypair zmq_curve_keypair(3) ØMQ Manual - ØMQ/4.1.0 Name zmq_curve_keypair - 生成一个新的CURVE 密钥对 Synopsis int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key); Description 函数zmq_curve_keypair