消息中间件NetMQ结合Protobuf简介

概述

  对于稍微熟悉这两个优秀的项目来说,每个内容单独介绍都不为过,本文只是简介并探讨如何将两部分内容合并起来,使其在某些场景下更适合、更高效。

  NetMQ:ZeroMQ的.Net版本,ZeroMQ简单来说就是局域网内的消息中间件(与MSMQ类似),包括了进程间通讯、点对点通讯、订阅模式通讯等等,底层用更“完美”的Socket实现,ZeroMQ实现了多语言、跨平台、高效率等诸多优势。详细介绍请参考ZeroMQ和NetMQ官方文档:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/

  Protocol Buffer:源自与Google内部的开源项目,作为高效的RPC消息协议,相比较Json、XML协议的消息格式,Protobuf在序列化以及数据大小上都具有十分明显的优势,跨平台,协议可读性也接近于Json等等。这里也推荐一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

定义Protobuf协议

  Protocol Buffer(简称Protobuf)是以.proto的脚本形式实现的通用语义形式,类似于Json格式:

message  WeatherMessage
{
    enum CommandType
    {
        Debug=0;
        Weather=1;
        Other=2;
    }

    required CommandType Command=1 [default=Weather];
    optional string Content=2;

    message Loaction
    {
        required int32 East=1;
        required int32 North=2;
    }

    repeated Loaction UserLocation=3;
}

  这里的Message、required(必选属性)、optional(可有可无属性)、repeated(内部嵌套的类型属性)等都是proto的关键字,具体意义以及为关键字的功能大家可以查看官方文档,这里只介绍如何应用,或者Stephen Liu的文章也不错。

  当然,光定义脚本是不能实现应用的,还需要根据特定的编码语言进行描述,这里利用Protobuf-Net来实现.Net平台的协议实现。

  首先,下载软件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)

  然后,解压并将刚才的.proto文件复制到文件夹ProtoGen下。

  最后,启动CMD并cd到ProtoGen文件夹目录下,运行命令:

  protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace

(-i指定了输入,-o指定了输出,-ns指定了生成代码的namespace)

  如果,正确的话(当然了,我给出的脚本是不会错的),就会生成一个PBWeatherMessage.cs文件,这样的话就可以将.cs文件加入到项目中当做一个纯粹的类来使用了。

代码中使用,就是类似于二进制序列化一样,只是这回序列化的是Protobuf专用的序列化方式而已。

  序列化:

                        #region Protobuf
                        var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                        {
                            Command = PBProtocol.WeatherMessage.CommandType.Weather,
                            Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity),
                        };

                        using (var sm = new MemoryStream())
                        {
                            ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                            publisher.Send(sm.ToArray());
                        }
                        #endregion

  反序列化:

                      var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                      var receivedBytes = subscriber.Receive();
                      using (var sm = new MemoryStream(receivedBytes))
                      {
                          weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                      }

  这里就简单介绍完了protobuf协议的使用,下面介绍一下NetMQ+Protobuf的使用。

NetMQ+Protobuf

  接下来我们来改造下NetMQ Sample中的Publisher-Subscriber模式:

  首先下载从GitHub上下载NetMQ Sample: https://github.com/zeromq/netmq

或者下载我的示例代码,其中包含了一个No Protobuf的工程,这个是直接摘自原作者的示例代码。

  服务端Publisher:

            using (var context = NetMQContext.Create())// NetMQ全局维护的Content上下文,建议只有一个并且使用完毕后及时回收。
            using (var publisher = context.CreatePublisherSocket())// 从Content上下文中创建CreatePublisherSocket,这里如果用其他四种模式之一需要Create其他类型。
            {
                publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。

                var rng = new Random();

                while (!stopRequested)
                {
                    int zipcode =  rng.Next(0, 99999);// 这里模拟一个随机命令编号(如果非10001,客户端直接丢弃此Publisher发布的消息,实现消息过滤)
                    int temperature = rng.Next(-80, 135);
                    int relhumidity = rng.Next(0, 90);

                    publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干净整洁。
                }
            }

  客户端Subscriber:

 using (var context = NetMQContext.Create())// 创建全局NetMQ句柄,建议唯一,使用完毕及时回收。
            using (var subscriber = context.CreateSubscriberSocket())// 创建Publisher-Subscriber模式的客户端监听。
            {
                subscriber.Connect("tcp://127.0.0.1:5556");// 连接到指定Socket
                subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 这里创建消息内容的过滤,如果不包含“zipToSubscribeTo”值则不接收消息。

                for (int i = 0; i < iterations; i++)
                {
                    string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”开头,则会返回整条信息。
                    Console.Write(".");

                    // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                    string[] split = results.Split(new[] { ‘ ‘ }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解码。

                    int zip = int.Parse(split[0]);
                    if (zip != zipToSubscribeTo)
                    {
                        throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));
                    }

                    totalTemp += int.Parse(split[1]);
                    totalHumidity += int.Parse(split[2]);
                }
            }

  这就是四种模式之一的发布者模式,使用起来很方便,但是这仅仅传递的是基于String的字符串,还不是一个可以序列化的对象,下一步我们将把消息字符串用Protobuf进行序列化与反序列化,来优化我们的消息格式。

请参考,我的示例代码中的Publisher Pattern工程:

  服务端Publisher:

                using (var context = NetMQContext.Create())
                using (var publisher = context.CreatePublisherSocket())
                {
                    publisher.Bind("tcp://127.0.0.1:5556");
                    var rng = new Random();
                    while (!stopRequested)
                    {
                        int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);
                        int temperature = rng.Next(-80, 135);
                        int relhumidity = rng.Next(0, 90);

                        #region Protobuf
                        var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                        {
                            Command = PBProtocol.WeatherMessage.CommandType.Weather,
                            Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity),
                        };

                        using (var sm = new MemoryStream())
                        {
                            ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                            publisher.Send(sm.ToArray());
                        }
                        #endregion

                        // publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));

                        WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));
                        System.Threading.Thread.Sleep(100);
                    }
                }

  客户端Subscriber:

 using (var context = NetMQContext.Create())
              using (var subscriber = context.CreateSubscriberSocket())
              {
                  subscriber.Connect("tcp://127.0.0.1:5556");
                  subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.

                  while (true)
                  {
                      if (curIndex > iterations) break;

                      var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                      var receivedBytes = subscriber.Receive();
                      using (var sm = new MemoryStream(receivedBytes))
                      {
                          weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                      }

                      // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                      string[] split = weatherMsg.Content.Split(new[] { ‘ ‘ }, StringSplitOptions.RemoveEmptyEntries);
                      int cmdId = int.Parse(split[0]);
                      if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather)
                      {
                          if (cmdId == zipToSubscribeTo)
                          {
                              curIndex++;
                              WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));
                              totalTemp += int.Parse(split[1]);
                              totalHumidity += int.Parse(split[2]);
                          }
                      }
                  }

  好了,其实单独来看,这两部分内容并为涉及的很深入,只是作为一个技术实践、技术储备,希望其中有问题或者有更好的应用场景,还请各位留言,不胜感谢!

  我的示例代码下载

冷静下来

这里补充一些不足:

  1. NetMQ中的过滤:默认NetMQ支持过滤,可是当我们摒弃String类型传递而转向Protobuf格式的时候NetMQ通道是无法解析其内容的,所以我们需要先解析内容,然后手写一些过滤代码,放弃了原生的支持。subscriber.SubscribeToAnyTopic()监听所有非过滤模式。
  2. NetMQ消息持久化:基于ZMQ的NetMQ设计理念中均不支持数据持久化(相比MSMQ而言,NetMQ不能接收当客户端不在线情况下的消息),所以如果需要持久化还需要做其他工作或者转战其他MQ家族。

引用

ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns

NetMQ:http://netmq.readthedocs.org/en/latest/introduction/

Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html

Protobuf-Net:https://code.google.com/p/protobuf-net/

时间: 2024-08-05 05:59:35

消息中间件NetMQ结合Protobuf简介的相关文章

Protobuf(一)——Protobuf简介

Protobuf简介 ? 什么是 Google Protocol Buffer? 假如您在网上搜索,应该会得到类似这样的文字介绍: ? Google Protocol Buffer( 简称 Protobuf) 是 Google 公司内部的混合语言数据标准,目前已经正在使用的有超过 48,162 种报文格式定义和超过 12,183 个 .proto 文件.他们用于 RPC 系统和持续数据存储系统. ? Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,

Protobuf 简介及简单应用

Protobuf 是 protocol buffers 的缩写. 根据官网的说法, protocol buffers 与平台无关, 与语言无关, 实现数据序列化的一种手段. 正如名字一样, protobuf 可以将数据按照规定的协议(protocol)序列化为二进制的数据(buffers). 序列化的数据基本上可以保证类型安全, 并且可以压缩大小. 这篇文章将简单说说关于 protobuf 的优点和问题, 如果有使用的需要可以作为参考 安装和使用 Protobuf 是在 github 上开源的项

protobuf简介和使用

1.Protocol Buffers简介 Protocol Buffers (ProtocolBuffer/ protobuf )是Google公司开发的一种数据描述语言,类似于XML能够将结构化数据序列化,可用于数据存储.通信协议等方面.现阶段支持C++.JAVA.Python等三种编程语言. 2.protobuf相比Xml的优点 •更简单 •数据描述文件只需原来的1/10至1/3 •解析速度是原来的20倍至100倍 •减少了二义性 •生成了更容易在编程中使用的数据访问类 3.安装 yum -

protobuf简介

#1,简介 把某种数据结构的信息,以某种格式保存起来: 主要用于数据存储,传输协议格式. #2,优点 性能好 反观XML的缺点:解析的开销惊人,不适用于事件性能敏感的场合:为了有较好的可读性,引入一些冗余信息,空间开销也不太好. 代码生成机制 支持"向后兼容"和"向前兼容" 支持多种编程语言 #3,缺点 应用不够广 二进制格式导致可读性差 缺乏自描述

Google 的开源技术protobuf 简介与例子[转]

来源:http://blog.csdn.net/caisini_vc/article/details/5599468 今天来介绍一下"Protocol Buffers "(以下简称protobuf)这个玩意儿.本来俺在构思"生产者/消费者模式 "系列的下一个帖子:关于生产者和消费者之间的数据传输格式.由于里面扯到了protobuf,想想干脆单独开一个帖子算了. ★protobuf是啥玩意儿? 为了照顾从没听说过的同学,照例先来扫盲一把. 首先,protobuf是一个

Google 的开源技术protobuf 简介与例子

来介绍一下“Protocol Buffers ”(以下简称protobuf)这个玩意儿.本来俺在构思“生产者/消费者模式 ”系列的下一个帖子:关于生产者和消费者之间的数据传输格式.由于里面扯到了protobuf,想想干脆单独开一个帖子算了. ★protobuf是啥玩意儿? 为了照顾从没听说过的同学,照例先来扫盲一把. 首先,protobuf是一个开源 项 目(官方站点在“这里 ”),而且是后台很硬的开源项目.网上现有的大部分(至少80%)开源项目,要么是某人单干.要么是几个闲杂人等合伙搞.而pr

消息中间件(消息队列MQ)简介

一.为什么要使用MQ 1. 异步:快速返回 2. 解耦:解除依赖 3. 削峰填谷 二.MQ的缺点 1. 系统可用性降低,因为MQ可能会挂 2. 系统复杂性提高,要考虑消息重复.丢失.顺序等问题 3. 数据一致性问题,生产者并不知道消费者是否真正消费了 三.怎么保证MQ消息不丢失 1. 生产者丢失数据,confirm机制 2. MQ丢失数据,持久化到磁盘 3. 消费者丢失数据,确认机制 四.怎么保证MQ高可用性 1. 单机模式 2. 普通集群模式,无法做到真正的高可用 3. 镜像集群模式,高可用但

序列化之protobuf与avro对比(Java)

最近在做socket通信中用到了关于序列化工具选型的问题,在调研过程中开始趋向于用protobuf,可以省去了编解码的过程.能够实现快速开发,且只需要维护一份协议文件即可. 但是调研过程中发现了protobuf的一些弊端,比如需要生成相应的文件类,和业务绑定太紧密,所以在看了AVRO之后发现它完美解决了这个问题. 下面记录下对这两种序列化工具的入门与测评. 一.protobuf基本操作 protobuf简介: Protocol Buffers (a.k.a., protobuf) are Goo

Google 开源技术protobuf

http://blog.csdn.net/hguisu/article/details/20721109#0-tsina-1-1601-397232819ff9a47a7b7e80a40613cfe1 1.  Protobuf简介 protobuf是google提供的一个开源序列化框架,类似于XML,JSON这样的数据表示语言,其最大的特点是基于二进制,因此比传统的XML 表示高效短小得多.虽然是二进制数据格式,但并没有因此变得复杂,开发人员通过按照一定的语法定义结构化的消息格式,然后送给命令行