ZeroMQ是对Socket的封装,通过组合多种类型的结点可以实现复杂的网络通信模式。而且ZeroMQ设计简单,可以有多种平台实现,对于跨平台项目是一个福音。
clrzmq是ZeroMQ的C#语言的实现。当我在使用clrzmq时,发现ZeroMQ的server端,即REP,在接收到消息后,回复消息,但是在回复消息之前不能再接收消息。用伪代码表示就是
while(true) { byte[] receiveData = new byte[1024]; receive(receiveData); //do some work byte[] responseData = new byte[1024]; send(reponseData); }
既然ZeroMQ的名称里含有MQ(Message Queue),就应该有队列的功能啊?在ZeroMQ的官方手册中介绍了router-dealer模式:
router可以作为路由器,起到缓存消息的作用,如果服务端空闲,会把消息通过dealer发送给服务端。
这篇文章使用C++实现了ZeroMQ消息队列。
幸运的是,clrzmq对router-dealer模式进行了封装,可以使用QueueDevice类实现相同的效果。
在我的例子中,我将router-dealer放在了服务端进程中,dealer和服务端的通信是县城通信,交互图如下:
tcp inproc inproc
connect ________________ connect
客户端i -------------|router -------- dealer| -----------服务端
——————---------
queueDevice
客户端代码如下:
static void Main(string[] args) { string serverAddress = "tcp://localhost:5555"; // ZMQ Context and client socket using (ZmqContext context = ZmqContext.Create()) using (ZmqSocket client = context.CreateSocket(SocketType.REQ)) { client.Connect(serverAddress); string request = "Hello"; while(true)//for (int requestNum = 0; requestNum < 10; requestNum++) { string again = Console.ReadLine(); Console.WriteLine("Sending request..."); client.Send(again + request, Encoding.Unicode); string reply = client.Receive(Encoding.Unicode); Console.WriteLine("Received reply {0}: ", reply); } } }
服务端代码:
class Program { static ZmqContext context = ZmqContext.Create(); static ManualResetEvent _deviceReady = new ManualResetEvent(false); //static ManualResetEvent _receiverReady = new ManualResetEvent(false); static void Main(string[] args) { startRouterDealer(); // ZMQ Context, server socket _deviceReady.WaitOne(); using (ZmqSocket server = context.CreateSocket(SocketType.REP)) { //server.Bind("inproc://backend"); server.Connect("inproc://backend"); while (true) { // Wait for next request from client string message = server.Receive(Encoding.Unicode); Console.WriteLine("Received request: {0}", message); //ThreadPool.QueueUserWorkItem(new WaitCallback(procedeRequest), server); // Do Some ‘work‘ Thread.Sleep(5000); // Send reply back to client server.Send(message, Encoding.Unicode); } } } private static void startRouterDealer() { ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), null); //ThreadPool.QueueUserWorkItem(new WaitCallback(startRouterDealerThread), null); } private static void startQueueDeviceThread(object state) { //Thread.Sleep(2000); using (QueueDevice queue = new QueueDevice(context, "tcp://*:5555", "inproc://backend", DeviceMode.Threaded)) { queue.Initialize(); _deviceReady.Set(); queue.Start(); while(true) { Thread.Sleep(1000); } } }}
ZeroMQ的手册中介绍说,router-dealer必须先启动,服务端再启动,因此ManualResetEvent 的作用是协调QueueDevice和服务端的启动顺序。
时间: 2024-12-08 17:15:16