SignalR与ActiveMQ结合构建实时通信

一、概述

本教程主要阐释了如何利用SignalR与消息队列的结合,实现不同客户端的交互

  • SignalR如何和消息队列交互(暂使用ActiveMQ消息队列)
  • SignalR寄宿在web中和其他SignalR、控制台客户端交互。
  • SignalR单独寄宿在控制台中和其他SignalR、控制台客户端交互。

下面屏幕截图展示了各个客户端通过ActiveMQ相互通信

  1、SignalR寄宿在web:

  2、SignalR寄宿在控制台中,web客户端调用SignalR,读者自行测试。

工程目录:

一、创建项目

  1、创建生产者项目,该项目要是通过控制台输入消息,发送到消息队列

    创建控制台应用程序命名为ActiveMQNetProcucer,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    主要代码如下:

 1 using Apache.NMS;
 2 using Apache.NMS.ActiveMQ;
 3 using System;
 4 using System.Collections.Generic;
 5 using System.Linq;
 6 using System.Text;
 7 using System.Threading.Tasks;
 8 namespace ActiveMQNet
 9 {
10     class Program
11     {
12         static IConnectionFactory _factory = null;
13         static IConnection _connection = null;
14         static ITextMessage _message = null;
15
16         static void Main(string[] args)
17         {
18             //创建工厂
19             _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
20
21             try
22             {
23                 //创建连接
24                 using (_connection = _factory.CreateConnection())
25                 {
26                     //创建会话
27                     using (ISession session = _connection.CreateSession())
28                     {
29                         //创建一个主题
30                         IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
31
32                         //创建生产者
33                         IMessageProducer producer = session.CreateProducer(destination);
34
35                         Console.WriteLine("Please enter any key to continue! ");
36                         Console.ReadKey();
37                         Console.WriteLine("Sending: ");
38
39                         //创建一个文本消息
40                         _message = producer.CreateTextMessage("Hello AcitveMQ....");
41
42                         //发送消息
43                         producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
44                         while (true)
45                         {
46                             var msg = Console.ReadLine();
47                             _message = producer.CreateTextMessage(msg);
48                             producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
49                         }
50
51                     }
52                 }
53
54             }
55             catch (Exception ex)
56             {
57                 Console.WriteLine(ex.ToString());
58             }
59
60             Console.ReadLine();
61
62         }
63     }
64 }

  2、创建消费者项目,该项目主要是订阅消息队列中的消息  

    创建控制台应用程序命名为ActiveMQNetCustomer,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    主要代码:

 1 using Apache.NMS;
 2 using Apache.NMS.ActiveMQ;
 3 using System;
 4 using System.Collections.Generic;
 5 using System.Linq;
 6 using System.Text;
 7 using System.Threading.Tasks;
 8
 9 namespace ActiveMQNetCustomer
10 {
11     class Program
12     {
13         static IConnectionFactory _factory = null;
14
15         static void Main(string[] args)
16         {
17             try
18             {
19                 //创建连接工厂
20                 _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
21                 //创建连接
22                 using (IConnection conn = _factory.CreateConnection())
23                 {
24                     //设置客户端ID
25                    // conn.ClientId = "Customer";
26                     conn.Start();
27                     //创建会话
28                     using (ISession session = conn.CreateSession())
29                     {
30                         //创建主题
31                         var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
32
33                         //创建消费者
34                         IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);
35
36                         //注册监听事件
37                         consumer.Listener += new MessageListener(consumer_Listener);
38
39                         //这句代码非常重要,
40                         //这里没有read方法,Session会话会被关闭,那么消费者将监听不到生产者的消息
41                         Console.Read();
42                     }
43
44                     //关闭连接
45                     conn.Stop();
46                     conn.Close();
47                 }
48
49             }
50             catch (Exception ex)
51             {
52                 Console.Write(ex.ToString());
53             }
54
55         }
56
57         /// <summary>
58         /// 消费监听事件
59         /// </summary>
60         /// <param name="message"></param>
61         static void consumer_Listener(IMessage message)
62         {
63             ITextMessage msg = (ITextMessage)message;
64             Console.WriteLine("Receive: " + msg.Text);
65         }
66     }
67 }

  3、创建包装ActiveMQ生产者和消费者项目,供SignalR.ActiveMQ.WebHost项目使用,来发布消息和订阅消息

    创建类库项目Signalr.ActiveMQ,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    主要代码;

    生产者类:创建单实例生产者对象调用Send发放,发送消息到ActiveMQ消息队列    

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Signalr.ActiveMQ
{
  public  class Procucer
    {
        private IMessageProducer producer;
        private static Procucer instance=null;
        private Procucer(string customerId,string address)
        {
            instance = this;
            //创建工厂
            IConnectionFactory _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");

            try
            {
                //创建连接
                IConnection _connection = _factory.CreateConnection();
                {
                    //创建会话
                    ISession session = _connection.CreateSession();
                    {
                        //创建一个主题
                        IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");

                        //创建生产者
                        producer = session.CreateProducer(destination);

                        Console.WriteLine("Please enter any key to continue! ");
                      //  Console.ReadKey();
                        Console.WriteLine("Sending: ");                      

                    }
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

            //Console.ReadLine();
        }

        public static Procucer GetInstance(string customerId="",string address= "tcp://127.0.0.1:61616/")
        {
            if (instance == null)
                instance = new Procucer(customerId, address);
            return instance;
        }

        public void Send(string msg)
        {
            //创建一个文本消息
            ITextMessage _message = producer.CreateTextMessage(msg);
            //发送消息
            producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
        }
    }
}

    消费者类:启用单独的线程监听消息队列中的消息,当监听到消息后 广播给所有的 SinglaR客户端,其中静态属性Clients保存了所有的SinglaR客户端,当SinglaR客户端连接或者断开的时候会更新Clients属性详细代码在SignalR.ActiveMQ.WebHost中的 MyHub文件中。为了阻止当前线程退出调用了 System.Threading.Thread.CurrentThread.Join();阻塞当前线程,避免当web中方法执行完毕后对象被回收,起不到监听消息队列的作用。

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.AspNet.SignalR.Hubs;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Web;

namespace SignalR.ActiveMQ
{
    public class Customer
    {
        private static object lockObj = new object();
        private static IHubCallerConnectionContext<dynamic> _clients;
        public static IHubCallerConnectionContext<dynamic> Clients
        {
            get { return _clients; }
            set
            {
                lock (lockObj)
                {
                    _clients = value;
                }
            }
        }
        public static void Run(string cutomerId="",string address= "tcp://127.0.0.1:61616/")
        {
            System.Threading.Thread t = new System.Threading.Thread(() =>
            {
                try
                {
                    //创建连接工厂
                    IConnectionFactory _factory = new ConnectionFactory(address);
                    //创建连接
                    using (IConnection conn = _factory.CreateConnection())
                    {
                        //设置客户端ID
                        conn.ClientId = cutomerId;
                        conn.Start();
                        //创建会话
                        using (ISession session = conn.CreateSession())
                        {
                            //创建主题
                            var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");

                            //创建消费者
                            IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);

                            //注册监听事件
                            consumer.Listener += new MessageListener(consumer_Listener);

                            //阻塞当前线程,监听消息
                            System.Threading.Thread.CurrentThread.Join();
                        }
                        //关闭连接
                        conn.Stop();
                        conn.Close();
                    }

                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.ToString());
                    Console.WriteLine(ex.ToString());
                }

            });

            t.Start();
        }
        static void consumer_Listener(IMessage message)
        {
            ITextMessage msg = (ITextMessage)message;
            if (Clients != null)
            {
                Clients.All.broadcastMessage(msg.Text);
            }
            Debug.WriteLine("Receive: " + msg.Text);
            Console.WriteLine("Receive: " + msg.Text);
        }
    }
}

  4、创建web自宿主的SignalR项目,该项目既发布消息,也订阅消息

    创建MVC项目SignalR.ActiveMQ.WebHost,然后用包管理器安装ActiveMQ的.Net客户端

    Install-Package Apache.NMS.ActiveMQ

    创建SignalR的hub:当有客户端连接或者断开的时候更新Customer.Clients 静态属性,保存所有的SignalR客户端。

    web端通过调用代理的Send方法发送消息到消息队列。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using Microsoft.AspNet.SignalR;
using Signalr.ActiveMQ;
using System.Threading.Tasks;

namespace SignalR.ActiveMQ.Sample.Signal.Class
{
    public class chatHub : Hub
    {
        public void Send(string clientName, string message)
        {
            Procucer.GetInstance().Send(message);
        }
        public override Task OnConnected()
        {
            Customer.Clients = this.Clients;
            return base.OnConnected();
        }

        public override Task OnDisconnected(bool stopCalled)
        {
            Customer.Clients = this.Clients;
            return base.OnDisconnected(stopCalled);
        }
    }
}

    Startup类中启动消费者监听线程,调用的项目Signalr.ActiveMQ中的Customer.Run()方法:

using Microsoft.AspNet.SignalR;
using Microsoft.Owin;
using Owin;
using SignalR.ActiveMQ;

[assembly: OwinStartupAttribute(typeof(SignalR.ActiveMQ.Sample.Startup))]
namespace SignalR.ActiveMQ.Sample
{
    public partial class Startup
    {
        public void Configuration(IAppBuilder app)
        {
            app.MapSignalR();

            Customer.Run();//启动消费者监听线程
        }
    }
}

二、启动顺序:

1、启动ActiveMQ程序 可参考  http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html

2、启动ActiveMQNetProcucer项目

3、ActiveMQNetCustomer项目

4、启动SignalR.ActiveMQ.WebHost,开多个浏览器窗口,模拟多个SignalR客户端

三、SignalR宿主和web客户端分离两个项目 

Signalr.ActiveMQ.SelfHost 用控制台寄宿SignalR提供的服务供Signalr.ActiveMQ.Web使用

Signalr.ActiveMQ.Web 通过chart.html调用Signalr.ActiveMQ.SelfHost的服务

Signalr.ActiveMQ.SelfHost 和SignalR.ActiveMQ.WebHost不能同时启动,现在两个项目绑定到了同一个端口。

四、测试

  在生产者窗口中输入消息回车,观察其他客户端的变化

在Singlar的web客户端发送消息,观察其他客户端的变化

源代码:https://github.com/zhaoyingju/SignalrActiveMQ.git

时间: 2024-12-09 10:48:24

SignalR与ActiveMQ结合构建实时通信的相关文章

activeMq构建应用 - 5

Broker:相当于一个ActiveMQ服务器实例 命令行启动参数示例如下: 1:activemq start :使用默认的activemq.xml来启动 2:activemq start xbean:file:../conf/activemq-2.xml :使用指定的配置文件来启动 3:如果不指定file,也就是xbean:activemq-2.xml,那么xml必须在classpath下面 用ActiveMQ来构建Java应用: 将用ActiveMQ Broker作为独立的消息服务器来构建J

ActiveMQ(四)——四、用ActiveMQ构建应用

一.多种启动Broker的方法 broker:相当于一个ActiveMQ服务器实例 命令行启动参数示例如下:1:activemq start:使用默认的activemq.xml来启动2:activemq start xbean:file:../conf/activemq-2.xml:使用指定的配置文件来启动3:如果不指定file,也就是xbean:activemq-2.xml,那么必须在classpath下面 如果需要启动多个broker,需要为broker设置一个名字broker.setNam

ASP.NET SignalR入门

前言 之前在培训ASP.NET WebAPI的时候有提过SignalR这个技术,但当时只是讲了是用来做什么的,并没有多说.因为自己也是画图找资料的时候见到的.后来当一直关注的前端大神贤心发布LayIM2.0之后,于是对Web聊天产生了兴趣.那么在.NET平台下的Web聊天有哪些呢?查找资料发现了ASP.NET SignalR.于是乎...So...Just do it! 简介 按照惯例,先介绍一下什么是SignalR.简单的说,ASP .NET SignalR 是一个ASP .NET 下的类库,

SignalR来做实时Web聊天

本章和大家分享的内容是使用Signal R框架创建个简易的群聊功能,主要讲解如何在.Net的MVC中使用这个框架,由于这个项目有官方文档(当然全英文),后面也不打算写分享篇了,主要目的是让朋友们在需要使用Web实时通信的时候有更多一种解决方案,毕竟这是微软主推的一种解决方案之一. SignalR网上简介 ASP.NET SignalR 是为 ASP.NET 开发人员提供的一个库,可以简化开发人员将实时 Web 功能添加到应用程序的过程.实时 Web 功能是指这样一种功能:当所连接的客户端变得可用

SignalR入门

一:什么是signalR Asp.net SignalR是微软为实现实时通信的一个类库.一般情况下,signalR会使用JavaScript的长轮询(long polling)的方式来实现客户端和服务器通信,随着Html5中WebSockets出现,SignalR也支持WebSockets通信.另外SignalR开发的程序不仅仅限制于宿主在IIS中,也可以宿主在任何应用程序,包括控制台,客户端程序和Windows服务等,另外还支持Mono,这意味着它可以实现跨平台部署在Linux环境下. Sig

使用SignalR实现消息提醒

Asp.net SignalR是微软为实现实时通信的一个类库.一般情况下,SignalR会使用JavaScript的长轮询(long polling)的方式来实现客户端和服务器通信,随着Html5中WebSockets出现,SignalR也支持WebSockets通信.另外SignalR开发的程序不仅仅限制于宿主在IIS中,也可以宿主在任何应用程序,包括控制台,客户端程序和Windows服务等,另外还支持Mono,这意味着它可以实现跨平台部署在Linux环境下. SignalR内部有两类对象:

Signalr实现消息推送

一.前言 大多数系统里面好像都有获取消息的功能,但这些消息来源都不是实时的,比如你开两个浏览器,用两个不同的账号登录,用一个账号给另外一个账号发送消息,然而并不会实时收到消息,必须要自己手动F5刷新一下页面才会显示自己的消息,这样感觉用户体验不太好.之前看了Learning hard关于Signalr的文章,刚好自己项目中有用到获取实时消息的功能,然而我们项目中就是用js代码setinterval方法进行1秒刷新读取数据的,这样严重给服务器端添加负担,影响系统性能!所以自己稍微研究了一下,下面是

第三章SignalR在线聊天例子

第三章SignalR在线聊天例子 本教程展示了如何使用SignalR2.0构建一个基于浏览器的聊天室程序.你将把SignalR库添加到一个空的Asp.Net Web应用程序中,创建用于发送消息到客户端的集线器(Hubs)类,创建一个Html页面让用户在该页面上发送和接收聊天信息.对于如何在MVC5环境中创建这个聊天室程序,请参阅Getting Started with SignalR 2.0 and MVC 5. SignalR是一个开源的.Net库,用于构建需要实时用户交互或实时数据更新的We

学习SignalR (1):认识SignalR

什么是SignalR? ASP .NET SignalR是一个ASP .NET 下的类库,可以在ASP .NET 的Web项目中实现实时通信.什么是实时通信的Web呢?就是让客户端(Web页面)和服务器端可以互相通知消息及调用方法,当然这是实时操作的. WebSockets是HTML5提供的新的API,可以在Web网页与服务器端间建立Socket连接,当WebSockets可用时(即浏览器支持Html5)SignalR使用WebSockets,当不支持时SignalR将使用其它技术来保证达到相同