ActiveMQ 复杂类型的发布与订阅

很久没po文章了,但是看到.Net里关于ActiveMQ发送复杂类型的文章确实太少了,所以贴出来和大家分享

发布:

    //消息发布
    public class Publisher
    {
        private IConnection _connection;
        private ISession _session;
        private IMessageProducer _producer;

        /// <summary>
        /// 初始化
        /// </summary>
        /// <param name="brokerUrl">广播地址</param>
        /// <param name="queueDestination">队列目标</param>
        public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic")
        {
            try
            {
                IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl);
                _connection = connectionFactory.CreateConnection();
                _connection.Start();
                _session = _connection.CreateSession();
                IDestination destination = _session.GetTopic(queueDestination);
                _producer = _session.CreateProducer(destination);
            }
            catch (Exception e)
            {
                Log.Error($"activemq初始化异常:{e.InnerException.ToString()}");
            }
        }

        public void Close()
        {
            _session.Close();
            _connection.Close();
        }

        /// <summary>
        /// 发送普通字符串消息
        /// </summary>
        /// <param name="text">字符串</param>
        public void SendText(string text)
        {
            ITextMessage objecto = _producer.CreateTextMessage(text);
            _producer.Send(objecto);
        }

        /// <summary>
        /// 发送对象消息
        /// </summary>
        /// <param name="mapMessages">MapMessage对象</param>
        /// <returns></returns>
        public bool SendObject(List<MapMessage> mapMessages)
        {
            bool result = true;
            if (mapMessages == null || mapMessages.Count < 0) return false;
            foreach (var mapMessage in mapMessages)
            {
                var message = _producer.CreateMapMessage();
                ActiveCommon.SetMapMessage<MapMessage>(message, mapMessage);
                try
                {
                    _producer.Send(message);
                    result = true;
                }
                catch (Exception e)
                {
                    Log.Error($"activemq发送美好异常:{e.InnerException.ToString()}");
                    result = false;
                }
            }
            return result;
        }

        /// <summary>
        /// 获取对象XML结果
        /// </summary>
        /// <param name="m">对象</param>
        /// <returns></returns>
        public string GetXmlStr(object m)
        {
            return _producer.CreateXmlMessage(m).Text;
        }
    }

  

订阅:

    //消息订阅
    class Subsriber
    {
        private IConnection _connection;
        private ISession _session;
        private IMessageConsumer _consumer;

        /// <summary>
        /// 初始化
        /// </summary>
        /// <param name="brokerUrl">广播地址</param>
        /// <param name="queueDestination">队列目标</param>
        public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic")
        {
            try
            {
                IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl);
                _connection = connectionFactory.CreateConnection();
                _connection.Start();
                _session = _connection.CreateSession();
                IDestination destination = _session.GetTopic(queueDestination);
                _consumer = _session.CreateConsumer(destination);
                _consumer.Listener += _consumer_Listener;

            }
            catch (Exception e)
            {
                Log.Error($"activemq初始化异常:{e.InnerException.ToString()}");
            }

        }

        private void _consumer_Listener(IMessage message)
        {
            var model = ActiveCommon.GetMapMessageByIMapMessage((IMapMessage)message);
            Log.Infor($"订阅接收:{_session.CreateXmlMessage(model).Text}");
        }
    }

复杂类型处理:

    public class ActiveCommon
    {
        //设置Message的Body信息
        public static void SetMapMessage<T>(IMapMessage mapMessage, T messages)
        {
            if (mapMessage == null || object.Equals(messages, null))
            {
                return;
            }

            foreach (var propertyInfo in messages.GetType().GetProperties())
            {
                if (propertyInfo.PropertyType.Name == "String")
                    mapMessage.Body.SetString(propertyInfo.Name, Convert.ToString(propertyInfo.GetValue(messages, null)));
                else
                    mapMessage.Body.SetInt(propertyInfo.Name, Convert.ToInt16(propertyInfo.GetValue(messages, null)));
            }
        }

        public static MapMessage GetMapMessageByIMapMessage(IMapMessage mapMessage)
        {
            if (mapMessage == null)
            {
                return null;
            }

            var MapMessage = new MapMessage();
            foreach (var propertyInfo in MapMessage.GetType().GetProperties())
            {
                propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name], null);
            }

            return MapMessage;
        }

        public static T GetMapMessageByIMapMessage<T>(IMapMessage mapMessage, T MapMessage)
        {
            if (mapMessage == null || object.Equals(MapMessage, null))
            {
                return default(T);
            }

            foreach (var propertyInfo in MapMessage.GetType().GetProperties())
            {
                propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name.ToUpper()], null);
            }

            return MapMessage;
        }
    }

重点是跨站点和跨服务器传输的时候,需要通过Message的Body去设置传输参数

时间: 2024-11-05 12:28:18

ActiveMQ 复杂类型的发布与订阅的相关文章

发布与订阅SQLServer

SQLServer 中发布与订阅 在对数据库做迁移的时候,会有很多方法,用存储过程,job,也可以用开源工具lettle,那么今天这些天变接触到了一种新的方法,就是SqlServer中自带的发布与订阅. 首先说明一下数据复制的流程.如下图A是(192.168.210.170)上的数据库,B是(172.23.100.109)上的数据库.把B当作数据源,然后A从B上获取数据. 发布前准备:首先两个服务器之间要能相互通讯,也就是能ping命令能通.   平时我们链接数据库的时候,经常都是用的ip登陆,

文成小盆友python-num12 Redis发布与订阅补充,python操作rabbitMQ

本篇主要内容: redis发布与订阅补充 python操作rabbitMQ 一,redis 发布与订阅补充 如下一个简单的监控模型,通过这个模式所有的收听者都能收听到一份数据. 用代码来实现一个redis的订阅者何消费者. 定义一个类: import redis class Redis_helper(): def __init__(self): self.__conn = redis.Redis(host='192.168.11.87') #创建一个连接 def pub(self, mes, c

SqlServer2005 数据库发布、订阅配置图文详解

一:准备条件 <1>软件准备条件 机器A端:SqlServer2005 Management Studio + WinServer 2003 Enterprise (作为发布服务器) 机器B端:Sqlserver2005 Management Studio Express + WinXP(作为订阅服务器) (可以用别的,不过订阅服务器版本不得高于发布服务器版本) <2>数据库复制准备条件 1. 所有被同步的数据表尽量要用主键,如果没有主键也没有关系,SqlServer会提示为表自动

【转】SQL Server 2008 数据库同步的两种方式 (发布、订阅)

上篇中说了通过SQL JOB的方式对数据库的同步,这一节作为上一节的延续介绍通过发布订阅的方式实现数据库之间的同步操作.发布订阅份为两个步骤:1.发布.2.订阅.首先在数据源数据库服务器上对需要同步的数据进行发布,然后在目标数据库服务器上对上述发布进行订阅.发布可以发布一张表的部分数据,也可以对整张表进行发布.下面分别介绍发布.订阅的过程. 一.发布.发布需要用实际的服务器名称,不能使用服务器的IP地址进行.能发布的信息包括[表].[存储过程].[用户函数]如果使用IP会有错误,如下图: 具体发

Python redis 发布和订阅

发布和订阅 类似于RSS发布者:服务器订阅者:Dashboad和数据处理看下面代码:类文件名:monitor.py: #!/usr/bin/python # -*- coding: utf-8 -*- __author__ = 'gaogd' import redis class RedisHelper:     def __init__(self):         self.__conn = redis.Redis(host='192.168.10.12', port=6379, passw

SQL SERVER发布与订阅

一.配置分发 1.配置分发服务器,注:配置发布与订阅,连接SQLSERVER必须用服务器名登录 2.配置分发 3.选择分发服务器 4.选择快照文件夹 5.设置此文件夹的读写权限为everyone 6.选择分发数据库路径 7.配置分发 8.配置分发 9.配置分发完成 二.新建发布 1.新建发布 2.选择发布数据库 3.选择发布类型 4.选择发布对象 5.指定何时运行快照代理 6.代理安全性 7.创建发布 8.填写发布名称 9.新建发布完成 三.新建订阅 1. 新建订阅 2. 查找sqlserver

Part1.2 、RabbitMQ -- Publish/Subscribe 【发布和订阅】

python 目录 (一).交换 (Exchanges) -- 1.1 武sir 经典 Exchanges 案例展示. (二).临时队列( Temporary queues ) (三).绑定(Bindings) (四).汇总(Putting it all together) python系列之 RabbitMQ -- Publish/Subscribe [发布和订阅] >>前面的部分我们创建了一个工作队列(work queue). 设想是每个任务都能分发到一个 worker[queue],这一

Sql2008 r2 使用ftp 发布和订阅方式同步数据

Sql2008 r2使用发布和订阅方式同步数据 由于很多图片 本篇没有图片 详情可以进入下载页  http://download.csdn.net/download/yefighter/7603741 1:发布服务器:发布方 sql2008 r2 iis7.5 windows server 2008 请登入服务器进行操作 不要用sqlserver远程连接 必须开启sqlserver agent服务以及开机自动启动 右键属性 打开sqlserver 点击新建本地发布 第一次发布的时候 会提示创建发

小贝_redis高级应用-发布与订阅

redis高级应用-发布与订阅 一.发布与订阅(pub/sub)功能 二.发布与订阅(pub/sub)机制 三.redis发布与订阅(pub/sub)的实现 一.发布与订阅(pub/sub)功能 Pub/Sub功能(meansPublish, Subscribe)即发布及订阅功能.基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件:发布者(如服务