Active MQ C#实现

原文链接:

Active MQ C#实现

内容概要

主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础。

正文

JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。

Message 由消息头,属性和消息体三部份组成。

Active MQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

示例代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

using Apache.NMS;
using System.Diagnostics;
using Apache.NMS.Util;
using System.Windows.Threading;

/*
 * 功能描述:C#使用ActiveMQ示例
 * 修改次数:2
 * 最后更新: by Kagula,2012-07-31
 *
 * 前提条件:
 * [1]apache-activemq-5.4.2
 * [2]Apache.NMS.ActiveMQ-1.5.6-bin
 * [3]WinXP SP3
 * [4]VS2008 SP1
 * [5]WPF工程 With .NET Framework 3.5
 *
 * 启动
 *
 * 不带安全控制方式启动
 * [你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat
 *
 * 安全方式启动
 * 添加环境变量:            ACTIVEMQ_ENCRYPTION_PASSWORD=activemq
 * [你的解压路径]\apache-activemq-5.4.2\bin>activemq xbean:file:../conf/activemq-security.xml
 *
 * Active MQ 管理地址
 * http://127.0.0.1:8161/admin/
 * 添加访问"http://127.0.0.1:8161/admin/"的限制
 *
 * 第一步:添加访问限制
 * 修改D:\apache\apache-activemq-5.4.2\conf\jetty.xml文件
 * 下面这行编码,原
 * <property name="authenticate" value="true" />
 * 修改为
 * <property name="authenticate" value="false" />
 *
 * 第二步:修改登录用户名密码,缺省分别为admin,admin
 * D:\apache\apache-activemq-5.4.2\conf\jetty-realm.properties
 *
 * 用户管理(前提:以安全方式启动ActiveMQ)
 *
 * 在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.properties文件中修改默认的用户名密码
 * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名
 * e.g.  添加oa用户,密码同用户名。
 * <authenticationUser username="oa" password="oa" groups="users,admins"/>
 *
 * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的Topic或Queue
 * 只能被哪些用户组read 或 write。
 *
 *
 * 配置C# with WPF项目
 * 项目的[Application]->[TargetFramework]属性设置为[.NETFramework 3.5](这是VS2008WPF工程的默认设置)
 * 添加[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\lib\Apache.NMS\net-3.5\Apache.NMS.dll的引用
 * Apache.NMS.dll相当于接口
 *
 * 如果是以Debug方式调试
 * 把[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\build\net-3.5\debug\目录下的
 * Apache.NMS.ActiveMQ.dll文件复制到你项目的Debug目录下
 * Apache.NMS.ActiveMQ.dll相当于实现
 *
 * 如果是以Release方式调试
 * 参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。
 *
 *
 * 参考资料
 * [1]《C#调用ActiveMQ官方示例》 http://activemq.apache.org/nms/examples.html
 * [2]《ActiveMQ NMS下载地址》http://activemq.apache.org/nms/activemq-downloads.html
 * [3]《Active MQ在C#中的应用》http://www.cnblogs.com/guthing/archive/2010/06/17/1759333.html
 * [4]《NMS API Reference》http://activemq.apache.org/nms/nms-api.html
 */

namespace testActiveMQSubscriber
{
    /// <summary>
    /// Interaction logic for Window1.xaml
    /// </summary>
    public partial class Window1 : Window
    {
        private static IConnectionFactory connFac;

        private static IConnection connection;
        private static ISession session;
        private static IDestination destination;
        private static IMessageProducer producer;
        private static IMessageConsumer consumer;

        protected static ITextMessage message = null;

        public Window1()
        {
            InitializeComponent();

            initAMQ("MyFirstTopic");
        }

        private void initAMQ(String strTopicName)
        {
            try
            {
                connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));

                //新建连接
                //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码

                //如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!
                connection.ClientId = "testing listener";
                connection = connFac.CreateConnection();//如果你是缺省方式启动Active MQ服务,则不需填用户名、密码

                //创建Session
                session = connection.CreateSession();

                //发布/订阅模式,适合一对多的情况
                destination = SessionUtil.GetDestination(session, "topic://" + strTopicName);

                //新建生产者对象
                producer = session.CreateProducer(destination);
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留

                //新建消费者对象:普通“订阅”模式
                //consumer = session.CreateConsumer(destination);//不需要持久“订阅”       

                //新建消费者对象:持久"订阅"模式:
                //    持久“订阅”后,如果你的程序被停止工作后,恢复运行,
                //从第一次持久订阅开始,没收到的消息还可以继续收
                consumer = session.CreateDurableConsumer(
                    session.GetTopic(strTopicName)
                    , connection.ClientId, null, false);

                //设置消息接收事件
                consumer.Listener += new MessageListener(OnMessage);

                //启动来自Active MQ的消息侦听
                connection.Start();
            }
            catch (Exception e)
            {
                //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
                Debug.WriteLine(e.Message);
            }
        }

        private void SendMsg2Topic_Click(object sender, RoutedEventArgs e)
        {
            //发送消息
            ITextMessage request = session.CreateTextMessage(DateTime.Now.ToLocalTime()+" "+tbMsg.Text);
            producer.Send(request);
        }

        protected void OnMessage(IMessage receivedMsg)
        {
            //接收消息
            message = receivedMsg as ITextMessage;

            //UI线程,显示收到的消息
            Dispatcher.Invoke(DispatcherPriority.Normal, new Action(() =>
            {
                DateTime dt = new DateTime();
                ListBoxItem lbi = new ListBoxItem();
                lbi.Content = DateTime.Now.ToLocalTime() + " " + message.Text;

                lbR.Items.Add(lbi);
            }));
        }
    }
}

队列通讯方式,消费者例子

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Apache.NMS;
using System.Diagnostics;
using log4net;
using Apache.NMS.Util;
using System.Collections;

namespace Cat8637AutoCallServer
{
    public class SMTask
    {
        public String Callee { get; set; }
        public String CheckNumber { get; set; }
        public int Deadline { get; set; }

        public override String ToString()
        {
            return String.Format("Callee={0},CheckNumber={1},Deadline={2}",
                Callee,CheckNumber,Deadline);
        }
    }

    /*
     * 负责接收任务,并把任务放在任务等待队列中。
     */
    public class MQClient
    {
        private static readonly ILog logger = LogManager.GetLogger(typeof(MQClient));

        private static IConnection connection = null;
        private static ISession session = null;

        Queue _voiceSMTasks = new Queue(); 

        public MQClient()
        {
            try
            {
                IConnectionFactory factory = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));

                //新建连接
                //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码
                connection = factory.CreateConnection();

                session = connection.CreateSession();
                IMessageConsumer consumer = session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM"));
                consumer.Listener += new MessageListener(OnMessage);

                connection.Start();
            }
            catch (Exception ex)
            {
                Debug.WriteLine(ex.Message);
            }
        }

        protected void OnMessage(IMessage receivedMsg)
        {
            IMessage message = receivedMsg as ITextMessage;

            SMTask smTask = new SMTask();
            smTask.Callee = message.Properties["Callee"] as String;
            smTask.CheckNumber = message.Properties["Message"] as String;
            smTask.Deadline = Convert.ToInt32(message.Properties["deadline"] as String);

            logger.Info("Received: "+smTask.ToString());

            lock (_voiceSMTasks)
            {
                _voiceSMTasks.Enqueue(smTask);
            }
        }

        public SMTask GetVoiceSMTask()
        {
            SMTask result = null;
            lock (_voiceSMTasks)
            {
                if (_voiceSMTasks.Count > 0)
                {
                    result = _voiceSMTasks.Dequeue() as SMTask;
                }
            }
            return result;
        }
    }
}

队列通讯方式,生产者例子

        private void Send_Click(object sender, RoutedEventArgs e)
        {
            try
            {
                IDestination destination = SessionUtil.GetDestination(session, "queue://TaskIssue_VoiceSM");

                //新建生产者对象
                IMessageProducer producer = session.CreateProducer(destination);
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留  

                ITextMessage request = session.CreateTextMessage();
                request.NMSCorrelationID = "TestVoiceSM";//这里我填了应用程序的名称。

                request.Properties["Callee"] = tbCallee.Text;
                request.Properties["Message"] = tbCheckNumber.Text;
                request.Properties["deadline"] = tbValidDuration.Text;

                producer.Send(request);

            }
            catch (Exception ex)
            {
                //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
                Debug.WriteLine(ex.Message);
            }
        }

        private void Window_Closed(object sender, EventArgs e)
        {
            try
            {
                if (session == null)
                    return;
                //if (connection == null)
                //    return;

                session.Close();
                //connection.Close();
            }
            catch (Exception ex)
            {
                Debug.WriteLine(ex.Message);
            }
        }
时间: 2024-12-15 17:59:30

Active MQ C#实现的相关文章

JMS 之 Active MQ 的消息传输

本文使用Active MQ5.6 一.消息协商器(Message Broker) broke:消息的交换器,就是对消息进行管理的容器.ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml. 二.连接器(Connectors) (一).传输连接器 (transportConnectors) transportConnectors 连接器:就是建立bro

Active MQ 启动报错

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 特性 ⒈ 多种语言和协议编写客户端.语言: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP ⒉ 完全支持J

JMS 之 Active MQ 启动嵌入式Broke

一.如何启动active MQ 服务 (一).使用命令启动 a./usr/local/activemq-5.9.0/bin 目录下 ./activemq start 默认使用conf/activemq.xml 配置文件 b.[[email protected] bin]# ./activemq start xbean:file:../conf/activemq-slave1.xml 使用指定的配置文件启动 (二).代码启动broker 在程序中可以通过编码的方式启动broker,如果要启动多个b

Active MQ学习笔记

一.Active MQ介绍 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 主要特点: 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Stomp REST,WS Notif

active MQ搭建

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 1安装 定义activemq安装目录为/usr/local/activemq 定义activemq数据存放目录为 /data/postmall/activemq/ cd /tmp wget http://archive.apache

JMS进阶-Spring整合Active MQ

只能说,Spring太流弊了,啥都能整合~~~~ First of all, start the service of Active MQ 项目目录结构如下 用到的jar包如下 activemq-client-5.13.1.jar commons-logging-1.1.3.jar geronimo-j2ee-management_1.1_spec-1.0.1.jar geronimo-jms_1.1_spec-1.1.1.jar hamcrest-core-1.3.jar junit-4.12

WebSphere MQ&&Active MQ

WebSphere MQ&&ActiveMQ WebSphere MQ 1.  中间件处于应用软件与系统软件之间,是一种以自己的复杂换取企业应用简单化的可复用的基础软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享.功能共享的目的. 2.  三种通信技术: RPC(remote process call):同步: CPI-C:同步: MQI(message queue interface):异步通信方式,通信的方式与传送协议无关. 3.

初探active mq

mq(message queue),即消息队列,目前比较流行消息队列是active mq 和kafka.本文介绍如何简单的使用active mq. ActiveMQ官网下载地址:http://activemq.apache.org/download.html ActiveMQ 提供了Windows 和Linux.Unix 等几个版本,可简单使用windows版本.下载解压启动即可,进入apache-activemq-5.15.8\bin\win64目录,双击activemq.bat即可启动. 从

JMS 之 Active MQ 消息存储

一.消息的存储方式 ActiveMQ支持JMS规范中的持久化消息与非持久化消息 持久化消息通常用于不管是否消费者在线,它们都会保证消息会被消费者消费.当消息被确认消费后,会从存储中删除 非持久化消息通常用于发送通知以及实时数据,通常要求性能优先,消息可靠性并不是必须的情况 MQ支持可插拔式的消息存储,如:内存.文件和关系数据库等方式 Queue消息模型在ActiveMQ的存储 采用存储采用先进先出(FIFO),一个消息只能被一个消费者消费,当消息被确认消费之后才会被删除. Topic消息模型(针