这是这个系列的第三篇,其他的文章请点击下列目录
WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(一)概要设计
WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel
WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel
相对于RequestChannel,ReplyChannel比较复杂一些。
1 启动zmq的rep结点
首先需要重载OnOpen方法,启动zmq的rep结点,主要是调用createSocket方法和绑定地址。
protected override void OnOpen(TimeSpan timeout) { if (this.socket == null) { this.socket = this.zmqContext.CreateSocket(SocketType.REP); int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(‘:‘); string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd); string zmqServerAddress = "tcp://*" + trimIP; socket.Bind(zmqServerAddress); } }
2 实现ReceiveReqeust,返回context
和RequestChannel一样,实现同步版本的ReceiveRequest。此方法是IReplyChannel接口的方法。为什么接口的方法不返回message,而是返回requestContext呢?原因是WCF在收到requestContext后,可以根据RequestContext发送回复消息。
public RequestContext ReceiveRequest(TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); Message request = this.ReceiveMessage(timeout); return new ZMQRequestContext(this, request, timeout); }
ReceiveMessage的实现为了复用,放在了基类ZMQChannelBase中
public Message ReceiveMessage(TimeSpan timeout) { base.ThrowIfDisposedOrNotOpen(); byte[] replyData = bufferManager.TakeBuffer(1000); int replaySize = socket.Receive(replyData); Message response = encoder.ReadMessage( new ArraySegment<byte>(replyData, 0, replaySize), bufferManager); bufferManager.ReturnBuffer(replyData); return response; }
3 同步版本的ReceiveRequest实现后,再实现异步版本的
WCF使用ReplyChannel接收消息,默认调用的是BeginReceiveRequest。
异步版本的实现也是使用.Net的AMP异步模式。调用的步骤如下图所示:由上向下执行。远程请求首先进入ReplyChannel的BeginTryReceiveRequest方法,此方法返回TryReceiveRequestAsyncResult实例。然后依次向下执行,直到在BaseChannel.SocketReceiveAsyncResult 中执行ZMQ的socket.Receive()方法。
远程请求 | ReplyChannel BeginTryReceiveRequest | TryReceiveRequestAsyncResult new | BaseChannel BeginReceiveRequest | ReceiveRequestAsyncResult new | BaseChannel BeginReceiveMessage | BaseChannel BeginReadData | BaseChannel.SocketReceiveAsyncResult new | 异步代理执行socket.Receive | BaseChannel EndReadData | BaseChannel EndReceiveMessage 此处将消息反序列化成数据
步骤很多,每一步都有意义,BeginTryReceiveRequest首先接到请求消息,同时处理超时的情况,使其不会抛出异常。转给BeginReceiveRequest,创建ReceiveRequestAsyncResult对象,在其构造中调用基类的BeginReceiveMessage。基类的BeginReceiveMessage纯粹是为了代码的复用性。转给BeginReadData,创建SocketReceiveAsyncResult对象,是真正启动socket.Receive(),其中使用了异步委托的方式实现了异步。
4 解决zmq的同步限制
必要的接口都实现后,可以启动wcf服务来接收zmq客户端的请求了。收到消息后,又一次执行到socket.Receive(),尝试再次接收消息时,出现了异常。异常显示“结点在目前的状态下无法执行此操作”。由于是第一次使用Zmq,不太了解ZMQ的机制。第一次执行socket.Receive()能正常接收消息,第二次执行socket.Receive()就会出错。我又查看了zmq的demo,也执行到第二个socket.Receive(),没有这样的问题。通过比较相同时间下的socket状态发现:
我的程序 socket在接收消息后, ReceiveStatus: Received 还没有回复返回值,所以: SendStatus: None zmq的demo socket在接收消息后, ReceiveStatus: Received 回复返回值 SendStatus: Sent
zmq的REP socket必须接收请求,发送返回后,才能再次接收请求。我的程序中由于调用了wcf的服务,一直是在调试状态,因此没有及时返回,造成了SendStatus是none,所以不能再次发送。
解决这个问题也很简单,使用了ManualResetEvent。在接收消息后,将ManualResetEvent置成reset状态,在receive()之前调用ManualResetEvent的WaitOne(),等待发送返回。一旦replyChannel发送返回后,立刻将ManualResetEvent置成set状态,就执行到了receive()。
接收时
serviceHanledDone.WaitOne(); int receiveLength = socket.Receive(data1); serviceHanledDone.Reset(); return receiveLength;
发送后,立刻将ManualResetEvent置成set,使得waitone放行。
socket.Send(data); serviceHanledDone.Set();
5 添加zmq队列支持
至此,zmqBinding可以接收到zmq客户端的请求,并能正确的返回。但是似乎一次只能接收一个请求,等待回复后,才能接收下一个请求。虽然wcf的处理都是异步的,但是zmq的rep结点限制了服务端的处理能力,那么怎么能接收多个请求呢?zmq既然叫做“mq”,是有队列的功能的。通过zmq的手册知道,router-dealer是可以实现队列的功能的。我使用的zmq版本是clrzmq,但是网上没有clrzmq实现router-dealder的例子代码。在clrzmq的源码中的测试代码中,我发现了clrzmq的QueueDevice类实现了zmq的router-dealer模式。而且使用起来很简单。完整的zmq例子请参见我的其他文章。
这里注意一点,就是QueueDevice应该首先启动,然后再启动REP 结点。因此还使用ManualResetEvent对象。QueueDecvice在一个新建的线程中启动,启动后,通知REP节点启动。代码是这样的:
protected override void OnOpen(TimeSpan timeout) { if (this.socket == null) { startRouterDealer(this.zmqContext); _deviceReady.WaitOne(); this.socket = this.zmqContext.CreateSocket(SocketType.REP); int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(‘:‘); string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd); string zmqServerAddress = "tcp://*" + trimIP; //socket.Bind(zmqServerAddress); socket.Connect("inproc://backend"); } } protected override void OnClosing() { base.OnClosing(); } private static void startRouterDealer(ZmqContext context) { ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), context); } private static void startQueueDeviceThread(object state) { ZmqContext context = state as ZmqContext; //Thread.Sleep(2000); using (QueueDevice queue = new QueueDevice(context, "tcp://*:5555", "inproc://backend", DeviceMode.Threaded)) { queue.Initialize(); _deviceReady.Set(); queue.Start(); while (true) { Thread.Sleep(1000); } } }
至此,ZMQBinding的Transport部分就完成了。下一篇开始介绍protocolBuffer消息编码,以及在wcf中如何编码和解码。