WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel(2016-03-15 12:35)

从今天开始,一步步介绍我是如何实现自定义的ZeroMQ绑定和protocolBuffer消息编码的。

本系列的想法主要来源于蒋金楠Artech的系列博客-WCF后续之旅,和WCF开发团队成员carlosfigueira的WCF Extension系列博客

首先要明确的是,通信和编码的实现是分离的。WCF也是这样设计的。自定义Binding包含BindingElementCollection集合,这个集合可以添加多个ElementBinding。但是有两个ElementBinding必不可少,TransportBindingElement、MessageEncodingBindingElement。而ZeroMQ只是提供了通信技术,不涉及具体的编码,它的传输数据是指定长度的字符串,至于字符串以用何种方式编码,ZeroMQ是不知道的。因此使得ZeroMQ作为WCF的自定义Binding中的TransportBindingElement的实现成为可能。我们先不管编码,先考虑传输。

WCF的通道栈在上述两个系列博客中已经有详细的介绍,我就不想重复了,直接说说我是如何将WCF和ZeroMQ衔接的。

实现IRequestChannel

新建类型ZMQRequestChannel,继承于ChannelBase,ChannelBase是WCF提供的通道基类。

首先要创建ZeroMQ的socket,类型是REQ。

 socket = this.zmqContext.CreateSocket(SocketType.REQ);
 socket.Connect(zmqAddress);

实现IRequestChannel的方法

public Message Request(Message message, TimeSpan timeout)

Request负责发送请求,阻塞当前线程,直到接收到服务器的回复。发送请求时,将Message转换为byte[],交给ZeroMQ发送

            ArraySegment<byte> requestData = encoder.WriteMessage(message, 1000, bufferManager);

            socket.Send(requestData.Array);

转换的方式是通过encoder对象获得的,encoder就是通过MessageEncodingBindingElement创建的。MessageEncodingBindingElement是WCF信道栈的一部分,但是Message并不是沿着每个信道穿过,MessageEncodingBindingElement就是这样的信道,它只是被TransportBindingElement使用,在TransportChannel需要编码和解码时,TransportBindingElement的encoder会站出来执行自己的职责。

接收服务端的回复时,是这样做的:

            byte[] replyData = bufferManager.TakeBuffer(1000);

            int replaySize = socket.Receive(replyData);

            Message response = encoder.ReadMessage(
                new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
            bufferManager.ReturnBuffer(replyData);

这里使用了BufferManager提供一个接收缓存,缓存的大小暂时设为1000。使用缓存的目的在于分配时的效率更高。当接收完成时,记得回收。

IRequestChannel还需要实现异步的发送方法

        IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state);
        IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state);
        Message EndRequest(IAsyncResult result);

参考刚才的同步方法,发送分为编码、发送。其中编码在本地完成,不会超时,发送由于依赖网络环境和服务器状态,需要将这部分移到异步过程中。

由于发送异步方法在ReplyChannel中也会使用,因此我创建了ZMQChannelBase基类,派生于ChannelBase,ZMQRequestChannel继承于新的ZMQChannelBase基类。把异步发送方法放在ZMQChannelBase中。

        public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
        {
            return base.BeginSendMessage(message, this.DefaultSendTimeout, callback, state);
        }
        public IAsyncResult BeginSendMessage(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            base.ThrowIfDisposedOrNotOpen();
            ArraySegment<byte> requestData = encoder.WriteMessage(message, 1000, bufferManager);

            return this.BeginWriteData(requestData, timeout, callback, state);
        }
BeginWriteData方法返回一个AsyncResult对象,实现异步接口IAsyncResult
        class SocketSendAsyncResult : AsyncResult
        {
            ZMQChannelBase channel;
            ArraySegment<byte> buffer;

            public SocketSendAsyncResult(ArraySegment<byte> buffer, ZMQChannelBase channel, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channel = channel;
                this.buffer = buffer;

                this.StartSending();
            }

            void StartSending()
            {
                SocketSendHandle handler = new SocketSendHandle(delegate(ZmqSocket socket, byte[] data)
                {
                    socket.Send(data);
                    serviceHanledDone.Set();
                });

                IAsyncResult sendResult = handler.BeginInvoke(channel.socket, buffer.Array, OnSend, null);

            }

            static void OnSend(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }

                SocketSendAsyncResult thisPtr = (SocketSendAsyncResult)result.AsyncState;
                Exception completionException = null;
                bool shouldComplete = false;

                if (shouldComplete)
                {
                    thisPtr.Complete(false, completionException);
                }
            }

            public static void End(IAsyncResult result)
            {
                AsyncResult.End<SocketSendAsyncResult>(result);
            }
        }

至此,RequestChannel就完成了,相比ReplyChannel,RequestChannel比较简单。下一篇介绍ReplyChannel。

时间: 2024-10-22 09:14:08

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel(2016-03-15 12:35)的相关文章

WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel(2016-03-15 12:35)

这是这个系列的第三篇,其他的文章请点击下列目录 WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(一)概要设计 WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel 相对于RequestChannel,ReplyChannel比较复杂一些. 1 启动zmq的rep结点 首先需要重载OnOpen方法,启动zmq的r

WCF扩展

WCF 可扩展性 WCF 提供了许多扩展点供开发人员自定义运行时行为. WCF 在 Channel Layer 之上还提供了一个高级运行时,主要是针对应用程序开发人员.在 WCF 文档中,它常被称为服务模型层(Service Model Layer).该高级运行时主要由一个称作 Dispatcher(在 ServiceHost 的 Context 中)的组件和一个称作 Proxy(在客户端的 Context 中)的组件组成.       (       图片引自 MSDN Magazine : 

iis 添加wcf扩展

iis 添加wcf扩展,码迷,mamicode.com

解决WCF调用时出现错误:“创建MTOM消息读取器时出错”

如题,查询一个数据集, 存储过程返回如:select * from B 中间层定义  public DataSet GetTable(string 查询条件); 客户端定义  DataSet ds = wcfClient.GetTable("") 以前一直正常着,查询也很快速,这两天不知修改到哪了,所有的查询如果返回记录较大时(100条左右),客户端就会出现服务端返回的异常错误"创建MTOM消息读取器时出错" 客户端 app.config 配置如下 <syst

ZeroMQ——一个轻量级的消息通信组件 C#

ZeroMQ——一个轻量级的消息通信组件 ZeroMQ是一个轻量级的消息通信组件,尽管名字中包含了"MQ",严格上来讲ZeroMQ并不是"消息队列/消息中间件".ZeroMQ是一个传输层API库, 更关注消息的传输.与消息队列相比,ZeroMQ有以下一些特点: 点对点无中间节点 传统的消息队列都需要一个消息服务器来存储转发消息.而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致.以为消息服务器最终还是转化为服务器对其他节点的点对点

日积(Running)月累(ZSSURE):WCF学习之“通过事件绑定控制WinForm宿主程序主界面控件”

背景: WCF服务需要寄宿到相应的可运行进程中执行,常见的有四种寄宿,分别是控制台程序.WinForm程序.IIS和Windows服务.之前学习老A博客和<WCF全面解析>时最常用到的是控制台寄宿,近期由于项目需求,需要在WinForm程序中调用WCF服务,本博文通过一个简单的实例来演示WCF在WinForm中的寄宿.并着重介绍如何利用事件绑定控制宿主主UI界面控件. 题记: 之前一直坚守在C++阵地,对于新语言.新技术(诸如Python.J2EE.Bigdata.AI)不甚感冒.自以为&qu

WindowsPhone8中LongListSelector的扩展解决其不能绑定SelectdeItem的问题

微软在Wp8中集成了LongListSelector, 但是该控件在ViewModel中不能实现的SelectdeItem双向绑定,因为其不是DependencyProperty没办法只能实现扩展! 1.实现LongListSelector的扩展ExtendedSelector public enum PositionOnAdd { Top, Default, NewItem } public class ExtendedSelector : LongListSelector { public

WCF自寄宿实现Https绑定

一.WCF配置 1 Address 将服务端发布地址和客户端访问地址都配置为https开始的安全地址.参考如下. <add key="SrvUrl" value="https://127.0.0.1:8001/Service"/> 2 Bingding 为适应WCF自寄宿的模式,应采用WSHttpBinding作为绑定模式,并选择Transport安全模式,此模式下支持由服务器SSL证书保证的信息完整性.保密性.服务端身份验证(不支持客户端身份验证,如甲

WCF关于svcutil生成关于绑定出现 元数据包含无法解析的引用的解决方案

元数据包含无法解析的引用. 没有终结点在侦听可以接受消息的 net.tcp://localhost:8000/service.这通常是由于不正确的地址或者 SOAP 操作导致的.如果存在此情况,请参阅 InnerException 以了解详细信息. 如果希望获取更多帮助,请键入"svcutil /?" 一查原来是没配置元数据端点,这是我重新更改后正确的服务端配置文件,可以比对一下: <?xml version="1.0" encoding="utf-