WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel(2016-03-15 12:35)

这是这个系列的第三篇,其他的文章请点击下列目录

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(一)概要设计

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel

相对于RequestChannel,ReplyChannel比较复杂一些。

1 启动zmq的rep结点

首先需要重载OnOpen方法,启动zmq的rep结点,主要是调用createSocket方法和绑定地址。

protected override void OnOpen(TimeSpan timeout)
        {
            if (this.socket == null)
            {
                this.socket = this.zmqContext.CreateSocket(SocketType.REP);
                int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(‘:‘);
                string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
                string zmqServerAddress = "tcp://*" + trimIP;
                socket.Bind(zmqServerAddress);
            }
        }

2 实现ReceiveReqeust,返回context

和RequestChannel一样,实现同步版本的ReceiveRequest。此方法是IReplyChannel接口的方法。为什么接口的方法不返回message,而是返回requestContext呢?原因是WCF在收到requestContext后,可以根据RequestContext发送回复消息。

        public RequestContext ReceiveRequest(TimeSpan timeout)
        {
            ThrowIfDisposedOrNotOpen();

            Message request = this.ReceiveMessage(timeout);
            return new ZMQRequestContext(this, request, timeout);
        }
ReceiveMessage的实现为了复用,放在了基类ZMQChannelBase中
        public Message ReceiveMessage(TimeSpan timeout)
        {
            base.ThrowIfDisposedOrNotOpen();
            byte[] replyData = bufferManager.TakeBuffer(1000);

            int replaySize = socket.Receive(replyData);

            Message response = encoder.ReadMessage(
                new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
            bufferManager.ReturnBuffer(replyData);
            return response;
        }

3 同步版本的ReceiveRequest实现后,再实现异步版本的

WCF使用ReplyChannel接收消息,默认调用的是BeginReceiveRequest。

异步版本的实现也是使用.Net的AMP异步模式。调用的步骤如下图所示:由上向下执行。远程请求首先进入ReplyChannel的BeginTryReceiveRequest方法,此方法返回TryReceiveRequestAsyncResult实例。然后依次向下执行,直到在BaseChannel.SocketReceiveAsyncResult 中执行ZMQ的socket.Receive()方法。

                                                              远程请求
                                                                 |
ReplyChannel                                            BeginTryReceiveRequest
                                                                 |
TryReceiveRequestAsyncResult                                  new
                                                                 |
BaseChannel                                                BeginReceiveRequest
                                                                 |
ReceiveRequestAsyncResult                                      new
                                                                 |
BaseChannel                                                BeginReceiveMessage
                                         |
BaseChannel                                                BeginReadData
                                                                 |
BaseChannel.SocketReceiveAsyncResult                        new
                                                                 |
                                                        异步代理执行socket.Receive
                                                                 |
BaseChannel                                                    EndReadData
                                                                 |
BaseChannel                                                EndReceiveMessage            此处将消息反序列化成数据

步骤很多,每一步都有意义,BeginTryReceiveRequest首先接到请求消息,同时处理超时的情况,使其不会抛出异常。转给BeginReceiveRequest,创建ReceiveRequestAsyncResult对象,在其构造中调用基类的BeginReceiveMessage。基类的BeginReceiveMessage纯粹是为了代码的复用性。转给BeginReadData,创建SocketReceiveAsyncResult对象,是真正启动socket.Receive(),其中使用了异步委托的方式实现了异步。

4 解决zmq的同步限制

必要的接口都实现后,可以启动wcf服务来接收zmq客户端的请求了。收到消息后,又一次执行到socket.Receive(),尝试再次接收消息时,出现了异常。异常显示“结点在目前的状态下无法执行此操作”。由于是第一次使用Zmq,不太了解ZMQ的机制。第一次执行socket.Receive()能正常接收消息,第二次执行socket.Receive()就会出错。我又查看了zmq的demo,也执行到第二个socket.Receive(),没有这样的问题。通过比较相同时间下的socket状态发现:

       我的程序
socket在接收消息后,
    ReceiveStatus: Received
还没有回复返回值,所以:
    SendStatus: None

       zmq的demo
socket在接收消息后,
    ReceiveStatus: Received
回复返回值
    SendStatus: Sent

zmq的REP socket必须接收请求,发送返回后,才能再次接收请求。我的程序中由于调用了wcf的服务,一直是在调试状态,因此没有及时返回,造成了SendStatus是none,所以不能再次发送。

解决这个问题也很简单,使用了ManualResetEvent。在接收消息后,将ManualResetEvent置成reset状态,在receive()之前调用ManualResetEvent的WaitOne(),等待发送返回。一旦replyChannel发送返回后,立刻将ManualResetEvent置成set状态,就执行到了receive()。

接收时

                    serviceHanledDone.WaitOne();
                    int receiveLength = socket.Receive(data1);
                    serviceHanledDone.Reset();
                    return receiveLength;

发送后,立刻将ManualResetEvent置成set,使得waitone放行。

                    socket.Send(data);
                    serviceHanledDone.Set();

5 添加zmq队列支持

至此,zmqBinding可以接收到zmq客户端的请求,并能正确的返回。但是似乎一次只能接收一个请求,等待回复后,才能接收下一个请求。虽然wcf的处理都是异步的,但是zmq的rep结点限制了服务端的处理能力,那么怎么能接收多个请求呢?zmq既然叫做“mq”,是有队列的功能的。通过zmq的手册知道,router-dealer是可以实现队列的功能的。我使用的zmq版本是clrzmq,但是网上没有clrzmq实现router-dealder的例子代码。在clrzmq的源码中的测试代码中,我发现了clrzmq的QueueDevice类实现了zmq的router-dealer模式。而且使用起来很简单。完整的zmq例子请参见我的其他文章。

这里注意一点,就是QueueDevice应该首先启动,然后再启动REP 结点。因此还使用ManualResetEvent对象。QueueDecvice在一个新建的线程中启动,启动后,通知REP节点启动。代码是这样的:

protected override void OnOpen(TimeSpan timeout)
        {
            if (this.socket == null)
            {
                startRouterDealer(this.zmqContext);

                _deviceReady.WaitOne();

                this.socket = this.zmqContext.CreateSocket(SocketType.REP);
                int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(‘:‘);
                string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
                string zmqServerAddress = "tcp://*" + trimIP;
                //socket.Bind(zmqServerAddress);
                socket.Connect("inproc://backend");
            }
        }
        protected override void OnClosing()
        {
            base.OnClosing();
        }
        private static void startRouterDealer(ZmqContext context)
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), context);
        }
        private static void startQueueDeviceThread(object state)
        {
            ZmqContext context = state as ZmqContext;
            //Thread.Sleep(2000);
            using (QueueDevice queue = new QueueDevice(context,
                "tcp://*:5555",
                "inproc://backend",
                DeviceMode.Threaded))
            {
                queue.Initialize();
                _deviceReady.Set();
                queue.Start();
                while (true)
                {
                    Thread.Sleep(1000);
                }
            }

        }

至此,ZMQBinding的Transport部分就完成了。下一篇开始介绍protocolBuffer消息编码,以及在wcf中如何编码和解码。

时间: 2024-10-25 19:32:53

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel(2016-03-15 12:35)的相关文章

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel(2016-03-15 12:35)

从今天开始,一步步介绍我是如何实现自定义的ZeroMQ绑定和protocolBuffer消息编码的. 本系列的想法主要来源于蒋金楠Artech的系列博客-WCF后续之旅,和WCF开发团队成员carlosfigueira的WCF Extension系列博客. 首先要明确的是,通信和编码的实现是分离的.WCF也是这样设计的.自定义Binding包含BindingElementCollection集合,这个集合可以添加多个ElementBinding.但是有两个ElementBinding必不可少,T

WCF扩展

WCF 可扩展性 WCF 提供了许多扩展点供开发人员自定义运行时行为. WCF 在 Channel Layer 之上还提供了一个高级运行时,主要是针对应用程序开发人员.在 WCF 文档中,它常被称为服务模型层(Service Model Layer).该高级运行时主要由一个称作 Dispatcher(在 ServiceHost 的 Context 中)的组件和一个称作 Proxy(在客户端的 Context 中)的组件组成.       (       图片引自 MSDN Magazine : 

iis 添加wcf扩展

iis 添加wcf扩展,码迷,mamicode.com

解决WCF调用时出现错误:“创建MTOM消息读取器时出错”

如题,查询一个数据集, 存储过程返回如:select * from B 中间层定义  public DataSet GetTable(string 查询条件); 客户端定义  DataSet ds = wcfClient.GetTable("") 以前一直正常着,查询也很快速,这两天不知修改到哪了,所有的查询如果返回记录较大时(100条左右),客户端就会出现服务端返回的异常错误"创建MTOM消息读取器时出错" 客户端 app.config 配置如下 <syst

ZeroMQ——一个轻量级的消息通信组件 C#

ZeroMQ——一个轻量级的消息通信组件 ZeroMQ是一个轻量级的消息通信组件,尽管名字中包含了"MQ",严格上来讲ZeroMQ并不是"消息队列/消息中间件".ZeroMQ是一个传输层API库, 更关注消息的传输.与消息队列相比,ZeroMQ有以下一些特点: 点对点无中间节点 传统的消息队列都需要一个消息服务器来存储转发消息.而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致.以为消息服务器最终还是转化为服务器对其他节点的点对点

日积(Running)月累(ZSSURE):WCF学习之“通过事件绑定控制WinForm宿主程序主界面控件”

背景: WCF服务需要寄宿到相应的可运行进程中执行,常见的有四种寄宿,分别是控制台程序.WinForm程序.IIS和Windows服务.之前学习老A博客和<WCF全面解析>时最常用到的是控制台寄宿,近期由于项目需求,需要在WinForm程序中调用WCF服务,本博文通过一个简单的实例来演示WCF在WinForm中的寄宿.并着重介绍如何利用事件绑定控制宿主主UI界面控件. 题记: 之前一直坚守在C++阵地,对于新语言.新技术(诸如Python.J2EE.Bigdata.AI)不甚感冒.自以为&qu

WindowsPhone8中LongListSelector的扩展解决其不能绑定SelectdeItem的问题

微软在Wp8中集成了LongListSelector, 但是该控件在ViewModel中不能实现的SelectdeItem双向绑定,因为其不是DependencyProperty没办法只能实现扩展! 1.实现LongListSelector的扩展ExtendedSelector public enum PositionOnAdd { Top, Default, NewItem } public class ExtendedSelector : LongListSelector { public

WCF自寄宿实现Https绑定

一.WCF配置 1 Address 将服务端发布地址和客户端访问地址都配置为https开始的安全地址.参考如下. <add key="SrvUrl" value="https://127.0.0.1:8001/Service"/> 2 Bingding 为适应WCF自寄宿的模式,应采用WSHttpBinding作为绑定模式,并选择Transport安全模式,此模式下支持由服务器SSL证书保证的信息完整性.保密性.服务端身份验证(不支持客户端身份验证,如甲

WCF关于svcutil生成关于绑定出现 元数据包含无法解析的引用的解决方案

元数据包含无法解析的引用. 没有终结点在侦听可以接受消息的 net.tcp://localhost:8000/service.这通常是由于不正确的地址或者 SOAP 操作导致的.如果存在此情况,请参阅 InnerException 以了解详细信息. 如果希望获取更多帮助,请键入"svcutil /?" 一查原来是没配置元数据端点,这是我重新更改后正确的服务端配置文件,可以比对一下: <?xml version="1.0" encoding="utf-