Publisher/Subscriber 订阅-发布模式

Publisher/Subscriber 订阅-发布模式

本博后续将陆续整理这些年做的一些预研demo,及一些前沿技术的研究,与大家共研技术,共同进步。

关于发布订阅有很多种实现方式,下面主要介绍WCF中的发布订阅,主要参考书籍《Programming WCF Services》,闲话不多说进入正题。使用传统的双工回调(例子 http://www.cnblogs.com/artech/archive/2007/03/02/661969.html)实现发布订阅模式存在许多缺陷,主要问题是,它会引入发布者和订阅者之间的高度耦合。订阅者必须先知道发布者在哪里,然后才能订阅它们,任何订阅者不知道的服务都无法通知事件的订阅者,部署好的应用程序中添加新的订阅者(或者移除已经存在的订阅者)是十分困难的事情。大致相同的是发布者也只能给它知道的订阅者发送通知消息,同时发布者还需要管理订阅者列表,这些与业务服务无关,这些逻辑增加了发布者的复杂度,另外在安全方面也存在订阅者与发布者也存在耦合,而且在发布者进程宕机时,所有订阅都会丢失。

要解决上面提及的问题最常见的解决方案就是发布-订阅模式(Publish-Subscribe 【OBSERVER】),如图D-1所示。

这里将订阅者区分为临时订阅者与持久订阅者,持久订阅者可以保存到磁盘上,当事件触发时可以通知订阅者,也可以很方便的通过传递回调使用回调机制,对于持久订阅者,需要记录订阅者地址,当触发事件时,发布服务将会调用持久订阅者地址 ,然后传递事件,因持久订阅者保存了订阅者地址至数据库或磁盘,因此当发布服务宕机时提高了管理性。

以上主要介绍理论,下面进入实践阶段,首先下载ServiceModelEx(Programming WCF Services 里面书籍作者提供的简化WCF编程的动态库),  https://github.com/CaseyBurns/ServiceModelEx,我们暂时不需要服务总线所以我们引入ServiceModelEx (.NET 4.0 no service bus) ,建好测试服务端(这里为了方便测试使用GUI 应用程序作为宿主),客户端。

管理临时订阅

例子D-1使用ServiceModelEx 提供的ISubscriptionService接口管理临时订阅者

   [ServiceContract]
   public interface ISubscriptionService
   {
      [OperationContract]
      void Subscribe(string eventOperation);

      [OperationContract]
      void Unsubscribe(string eventOperation);
   }

作为通用接口它不关心回调契约,然后添加临时订阅者契约继承通用接口,并设置回调契约

    [ServiceContract(CallbackContract = typeof(IMyEvents))]
    public interface IMySubscriptionService : ISubscriptionService
    {
    }

回调契约

    [ServiceContract]
    public interface IMyEvents
    {
        [OperationContract(IsOneWay = true)]
        void OnEvent1();
        [OperationContract(IsOneWay = true)]
        void OnEvent2(int number);
        [OperationContract(IsOneWay = true)]
        void OnEvent3(int number, string text);
    }

实现临时订阅服务.

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
    public class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService,IPersistentSubscriptionService
    {

    }

这里有几点需要注意:服务类型必须是会话服务(InstanceContextMode = InstanceContextMode.PerCall),会话服务才能够使用回调,另外ServiceModelEx 中的类 SubscriptionManager<T> 已经实现了通用接口所定义的添加订阅者与取消订阅接口,所以这里不需要我们再写任何代码。IPersistentSubscriptionService 作为持久订阅者接口,SubscriptionManager<T> 也实现了该接口,接下来会讲到。

配置文件配置发布订阅者服务

  <system.serviceModel>
    <serviceHostingEnvironment multipleSiteBindingsEnabled="true" />
    <bindings>
      <netTcpBinding>
        <binding name="NetTcpBinding_IService1" receiveTimeout="00:25:00"
          maxBufferSize="2147483647" maxReceivedMessageSize="2147483647" transactionFlow="true">
          <reliableSession inactivityTimeout="00:25:00" enabled="true" />
          <security mode="None" />
        </binding>
      </netTcpBinding>
    </bindings>
    <services>
      <service behaviorConfiguration="MyBehavior" name="Service.sub.MySubscriptionService">
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://localhost:8022/"/>
            <add baseAddress="http://localhost:8023/"/>
          </baseAddresses>
        </host>
        <endpoint name="Sub" address="Sub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
          contract="Service.sub.IMySubscriptionService" />
        <endpoint name="PersistentSub" address="PersistentSub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
            contract="ServiceModelEx.IPersistentSubscriptionService" />
      </service>
      <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService">
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://localhost:8022/MyPub/"/>
            <add baseAddress="http://localhost:8023/MyPub/"/>
          </baseAddresses>
        </host>
        <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
          contract="Service.sub.IMyEvents" />
      </service>
    </services>
    <behaviors>
      <serviceBehaviors>
        <behavior name="MyBehavior">
          <serviceMetadata httpGetEnabled="true"/>
          <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="10000" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
  </system.serviceModel>

其中Service.pub.MyPublishService 服务为发布者服务配置 接下来会讲到。

这样临时订阅者就实现了,接下来看持久订阅者.持久订阅者的通用接口使用ServiceModelEx中定义的IPersistentSubscriptionService

   [ServiceContract]
   public interface IPersistentSubscriptionService
   {
      [OperationContract(Name = "SubscribePersistent")]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      void Subscribe(string address,string eventsContract,string eventOperation);

      [OperationContract(Name = "UnSubscribePersistent")]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      void Unsubscribe(string address,string eventsContract,string eventOperation);

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      PersistentSubscription[] GetAllSubscribers();

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      PersistentSubscription[] GetSubscribersToContract(string eventsContract);

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      string[] GetSubscribersToContractEventType(string eventsContract,string eventOperation);

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      PersistentSubscription[] GetAllSubscribersFromAddress(string address);
   }

这里我添加了对[OperationContract(Name = "SubscribePersistent")] 将添加订阅方法进行重命名,以区别临时订阅接口的Subscribe方法.持久订阅不需要回调函数,接下来实现持久订阅同样简单,上面已经贴过代码,ServiceModelEx中的SubscriptionManager<T>同样已经实现了IPersistentSubscriptionService接口,这样临时订阅与持久订阅完成,接下来看发布服务。

发布服务应该支持与订阅服务一样的事件契约,这是订阅服务与发布服务唯一的连接点,使用IMyEvents 作为例子,另外ServiceModelEx提供了用于简化发布服务的帮助类PublishService<T>

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
    public class MyPublishService : PublishService<IMyEvents>, IMyEvents
    {
        public void OnEvent1()
        {
            FireEvent();
        }

        public void OnEvent2(int number)
        {
            FireEvent(number);
        }

        public void OnEvent3(int number, string text)
        {
            FireEvent(number, text);
        }
    }

其中FireEvent()被用作激发所有订阅者的事件,无论是临时还是持久订阅者,帮助类PublishService<T>已经做了实现,接下来配置发布服务

      <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService">
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://localhost:8022/MyPub/"/>
            <add baseAddress="http://localhost:8023/MyPub/"/>
          </baseAddresses>
        </host>
        <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
          contract="Service.sub.IMyEvents" />
      </service>

这样发布服务完成,使用Gui应用程序作为宿主,可以使用ServiceModelEx 中ServiceHost<T> 作为发布的帮助类 。

    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
        ServiceHost<MyPublishService> hostPub = new ServiceHost<MyPublishService>();
        ServiceHost<MySubscriptionService> host = new ServiceHost<MySubscriptionService>();
        private void Form1_Load(object sender, EventArgs e)
        {
            try
            {
                host.EnableMetadataExchange();
                host.Open();

                hostPub.EnableMetadataExchange();
                hostPub.Open();
            }
            catch (Exception ex)
            {
                throw;
            }

        }

        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                host.Close();
            }
            catch (Exception)
            {
                try
                {
                    host.Abort();
                }
                catch (Exception)
                {

                }
            }
            try
            {
                hostPub.Close();
            }
            catch (Exception)
            {
                try
                {
                    hostPub.Abort();
                }
                catch (Exception)
                {

                }
            }
        }
    }

其中 host.EnableMetadataExchange(); 能够帮助发布元数据,不需要再到配置中进行配置,服务配置好后接下来看客户端使用,

客户端可以直接添加服务引用生成服务代理,但是一般本人喜欢使用SvcUtil工具生成代理,或者干脆直接使用通道进行服务调用,后者更为我所喜爱,因为这样代码阅读行更强,更简练。例子中偷了下懒,直接添加服务引用,然后用通道调用服务,这样剩了点复制配置或者接口的功夫,所以看到例子不要感到奇怪,全因懒造成的,废话不多说,接下来看临时订阅客户端调用

        DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback> channelFactory = null;
        IMySubscriptionService proxy = null;
        private void btnSub_Click(object sender, EventArgs e)
        {
            MyEventsCallback callBack = new MyEventsCallback();
            callBack.OnResultEvent += CallBack_OnResultEvent;
            InstanceContext<IMySubscriptionServiceCallback> instanceContext = new InstanceContext<IMySubscriptionServiceCallback>(callBack);
            channelFactory = new DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback>(instanceContext, "Sub");
            proxy = channelFactory.CreateChannel();
            proxy.Subscribe(null);
        }

这里使用ServiceModelEx 中提供的DuplexChannelFactory<T,C>  类型安全的双向通道类创建代理,MyEventsCallback 实现回调接口,具体实现如下:

    internal class MyEventsCallback : IMySubscriptionServiceCallback
    {
        SynchronizationContext sc = SynchronizationContext.Current;
        public event EventHandler<EventsCallbackArgs> OnResultEvent;
        public void OnEvent1()
        {
            sc.Post(result =>
            {
                EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent1", System.Environment.NewLine) };
                e.Raise(this, ref OnResultEvent);
            }, null);
        }

        public void OnEvent2(int number)
        {
            sc.Post(result =>
            {
                EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent2:", number, System.Environment.NewLine) };
                e.Raise(this, ref OnResultEvent);
            }, null);
        }

        public void OnEvent3(int number, string text)
        {
            sc.Post(result =>
            {
                EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent3:", number, "text:", text + System.Environment.NewLine) };
                e.Raise(this, ref OnResultEvent);
            }, null);
        }
    }

    public static class EventArgExtensions
    {
        public static void Raise<TEventArgs>(this TEventArgs e, Object sender, ref EventHandler<TEventArgs> eventDelegate) where TEventArgs : EventArgs
        {
            EventHandler<TEventArgs> temp = Interlocked.CompareExchange(ref eventDelegate, null, null);
            if (temp != null) temp(sender, e);
        }
    }

SynchronizationContext 上下文提供post方法调用gui线程更新ui,e.Raise 使用扩展以线程安全方式调用事件,客户端调用订阅者就完成了,别忘了关闭代理,接下来看客户端调用发布者

客户端调用发布服务:

  public partial class PubMessageForm : Form
    {
        public PubMessageForm()
        {
            InitializeComponent();
        }
        ChannelFactory<IMyEvents> channelFactory = null;
        IMyEvents proxy = null;
        private void btnStartPub_Click(object sender, EventArgs e)
        {
            channelFactory = new ChannelFactory<IMyEvents>("PubMyEvents");
            proxy = channelFactory.CreateChannel();
        }

        private void PubMessageForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                using (proxy as IDisposable)
                {

                }
                channelFactory.Close();
            }
            catch
            {
                channelFactory.Abort();
            }
        }

        private void btnPub_Click(object sender, EventArgs e)
        {
            proxy.OnEvent1();
        }

        private void btnPub2_Click(object sender, EventArgs e)
        {
            proxy.OnEvent2(2);
        }

        private void btnPub3_Click(object sender, EventArgs e)
        {
            proxy.OnEvent3(3, txtPubMessage.Text);
        }

        private void PubMessageForm_Load(object sender, EventArgs e)
        {

        }
    }

使用ChannelFactory<T> 通道调用发布服务

这样WCF发布订阅服务就完成了,另外如果发布服务或订阅服务不需要同步绑定,可以考虑使用msmq ,这样发布-订阅模式兼具松耦合和无连接系统的优势。

需要注意的是队列化发布-订阅服务不支持临时订阅,需要使用持久订阅,具体实现在此不多讲,另外还可以结合服务发现实现另外一种模式的发布订阅模式,具体可以参考书籍《Programming WCF Services》。

Demo下载 http://files.cnblogs.com/files/skystar/Demo.7z

书中自有黄金屋,书中自有颜如玉

时间: 2024-08-05 13:02:44

Publisher/Subscriber 订阅-发布模式的相关文章

JS实现观察者模式(订阅/发布模式)

实现 /*  * js 观察者模式 又称 订阅/发布模式  * 通过创建"可观察"对象,当发生一个感兴趣的事件时可将该事件通告给  * 所有观察者,从而形成松耦合 */ // 通用的发布者 EventPublisher = Base.extend({ publish: function(data, type) { EventPublisher.publish(data, type); } }, { subscribers : {         any : []    // 事件类型:

Spring基于事件驱动模型的订阅发布模式代码实例详解

代码下载地址:http://www.zuidaima.com/share/1791499571923968.htm 原文:Spring基于事件驱动模型的订阅发布模式代码实例详解 事件驱动模型简介 事件驱动模型也就是我们常说的观察者,或者发布-订阅模型:理解它的几个关键点: 首先是一种对象间的一对多的关系:最简单的如交通信号灯,信号灯是目标(一方),行人注视着信号灯(多方): 当目标发送改变(发布),观察者(订阅者)就可以接收到改变: 观察者如何处理(如行人如何走,是快走/慢走/不走,目标不会管的

AngularJS的简单订阅发布模式例子

控制器之间的交互方式广播 broadcast, 发射 emit 事件 类似于 js中的事件 , 可以自己定义事件 向上传递直到 document 在AngularJs中 向上传递直到 rootScope 观察者模式, 订阅发布模式 类似于js中的事件机制 订阅者.on('xx发布博客', function([内容]){ 通知我, 接收到博客的[内容] }) 发布者.emit('xxx发布博客', {内容}) 优点: 业务和实际触发者分离, 代码维护性相对好 缺点: 代码复杂性更高 Angular

订阅发布模式

场景概述: 有时需要将多个应用程序集成到一个框架中,这些应用程序常见的基础通信方式包含总线模式.代理模式. 或者点对点模式.一些应用程序发送多种类型的消息,其他应用程序可能更关注这些消息类型的组合. 例如,在一个金融系统存在多个应用程序管理同一客户信息的情况,存在一个客户关系管理程序(CRM)掌握客户信息. 一种典型的情况:客户信息存在于其他系统中,且这些系统执行各自客户信息管理函数来处理客户信息. 当某个面向客户的应用程序生成更新客户信息的消息,例如客户地址的修改时,CRM和其他管理客户信息的

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

Node中EventEmitter以及如何实现JavaScript中的订阅/发布模式

1.EventEmitter Node中很多模块都能够使用EventEmitter,有了EventEmitter才能方便的进行事件的监听.下面看一下Node.js中的EventEmitter如何使用. (1)基本使用 EventEmitter是对事件触发和事件监听功能的封装,在node.js中的event模块中,event模块只有一个对象就是EventEmitter,下面是一个最基本的使用方法: var EventEmitter = require('events').EventEmitter;

4 交换机-fanout(订阅发布模式)

目录 订阅发布模式 1.交换器(Exchange) 1.1.创建交换器 1.2 .推送消息到交换器 2.临时队列 3.绑定(bingdings) 5.代码例子 5.1.生产者代码示例 5.2.消费者代码示例 订阅发布模式 1.交换器(Exchange) 在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅) RabbitMQ的消息发送模型核心思想是生产者不直

Java里观察者模式(订阅发布模式)

创建主题(Subject)接口 创建订阅者(Observer)接口 实现主题 实现观察者 测试 总结 在公司开发项目,如果碰到一些在特定条件下触发某些逻辑操作的功能的实现基本上都是用的定时器 比如用户注册完后,发送邮件,为了防止邮件发送失败或者发送邮件比较耗时,一般也都是通过定时器去扫库里注册没有发邮件的用户数据 再比如一个订单,在改变状态后,要归档,这也是通过定时器来实现的,扫描订单的数据,通过判断状态来做相对应的处理 但这样处理的话,定时器就会越来越多,总觉得不太好 然后,从一些资讯网站上的

【并发】9、借助redis 实现生产消费,消息订阅发布模式队列

这个就是一个消息可以被多次消费的范例了 其实这个实现的方式可以参考我之前的设计模式,观察者模式 https://www.cnblogs.com/cutter-point/p/5249780.html 不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象 不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些