SmartRoute之大规模消息转发集群实现

消息转发的应用场景在现实中的应用非常普遍,我们常用的IM工具也是其中之一;现有很多云平台也提供了这种基础服务,可以让APP更容易集成相关功能而不必投入相应的开发成本。对于实现这样一个简单功能并不复杂,对于现有的技术来说用.net提个通讯服务器支持几十W用户相信也不是件困难的事情;但如果考虑可用性和更大规模那就需要下点功夫,并且对相关技术有深入的了解才能实现了。而在这里主要讲解一下如何通过SmartRoute来实现一个大规模的消息转发集群的基础服务。

说到集群那肯定由N个服务组成的一组应,那做一个消息转发集群的基础服务需要那些服务节点呢?分析一下主要包括两大块:注册中心和消息网关;网关用于和应用对接,而注册中心则是明确应用所在位置。为了达到更好的可用性和更大规模支撑注册中心和网关都是N-N的关系。

?

看到这样一个图估计会把很不了解这方面的朋友会卡住,这样一个东西实现会很复杂吧!其实在SmartRoute基础之上实现这样这样一个集群服务并不困难,不过对于消息交互原理和设计还是需要了解一下。接下来讲解一下如何用SmartRoute实现相应注册中心和网关服务。

注册中心

注册中心的作用很简单就是保存应用标识所在位置,当网关需要转发消息的时候告诉网关这个应用标识在那个位置上。除了这一功能外当然还要考虑可用性,主要包括多中心发现和注册信息现步等;同样网关也具行指向多台中心的负载能力。

	public interface ICenter : IDisposable
	{

		String ID { get; }

		INode Node { get; set; }

		IUserService UserService { get; set; }

		void Open();

	}

中心的接口定义很简单,主要都是内部针对SmartRoute的INode进行相关消息操作。

		public void Open()
		{
			mCenterSubscriber = Node.Register<EventSubscriber>(ID);
			mCenterSubscriber.Register<Protocol.SyncUsers>(OnSyncUsers);
			mCenterSubscriber.Register<Protocol.CenterStarted>(OnOtherCenterStarted);
			mCenterSubscriber.Register<Protocol.Register>(OnSyncUser);
			mCenterSubscriber.Register<Protocol.UnRegister>(OnSyncUnRegister);
			Node.SubscriberRegisted += OnSubscriberRegisted;
			mStartServiceTimer = new System.Threading.Timer(OnOpen, null, 5000, 5000);
			Node.Loger.Process(LogType.INFO, "search other center...");
		}

		//处理用户上线所在网关信息
		private void OnReceiveUsersInfo(Message msg, Protocol.GetUsersInfo e)
		{
			string[] users = e.Receiver.Split(‘;‘);
			Protocol.GetUserInfoResponse response = new Protocol.GetUserInfoResponse();
			response.RequestID = e.RequestID;
			Protocol.OperationStatus status = new Protocol.OperationStatus();
			foreach (string user in users)
			{
				Protocol.UserInfo info = UserService.GetUserInfo(user, status);
				if (info != null)
					response.Items.Add(info);
			}
			msg.Reply(response);
		}
		 //网关用户下线
		private void OnUserUnregister(Message msg, Protocol.UnRegister e)
		{
			Protocol.OperationStatus status = new Protocol.OperationStatus();
			UserService.Remove(e.Name, status);
			msg.Reply(status);
			Node.Loger.Process(LogType.INFO, "{0} user unregister", e.Name);
			//同步到其他中心节点
			if (mHasOtherCenter)
				mCenterSubscriber.Publish(CENTER_OTHER_TAG, e, ReceiveMode.Regex);
		}
		 //网关用户上线
		private void OnUserRegister(Message msg, Protocol.Register e)
		{
			Protocol.OperationStatus status = new Protocol.OperationStatus();
			UserService.Register(new Protocol.UserInfo() { Name = e.Name, Gateway = e.GatewayID }, status);
			msg.Reply(status);
			Node.Loger.Process(LogType.INFO, "{0} user register from {1}", e.Name, e.GatewayID);
			//同步到其他中心节点
			if (mHasOtherCenter)
				mCenterSubscriber.Publish(CENTER_OTHER_TAG, e, ReceiveMode.Regex);
		}

		//同步下线
		private void OnSyncUnRegister(Message msg, Protocol.UnRegister e)
		{
			Protocol.OperationStatus status = new Protocol.OperationStatus();
			UserService.Remove(e.Name, status);
			Node.Loger.Process(LogType.INFO, "{0} user unregister", e.Name);
		}
		//同步上线
		private void OnSyncUser(Message msg, Protocol.Register e)
		{
			Protocol.OperationStatus status = new Protocol.OperationStatus();
			UserService.Register(new Protocol.UserInfo() { Name = e.Name, Gateway = e.GatewayID }, status);
			Node.Loger.Process(LogType.INFO, "{0} user register from {1}", e.Name, e.GatewayID);
		}

		//同步其他中心上线信息
		private void OnSyncUsers(Message msg, Protocol.SyncUsers e)
		{
			Node.Loger.Process(LogType.INFO, "sync user info to local!");
			Protocol.OperationStatus status = new Protocol.OperationStatus();
			foreach (Protocol.UserInfo item in e.Items)
			{
				UserService.Register(item, status);
			}
		}

		private void OnSubscriberRegisted(INode node, ISubscriber subscriber)
		{
			//发现其他中心服务,向服务发起同步用户请求
			if (subscriber.Name.IndexOf(CENTER_TAG) == 0 && subscriber.Name != ID)
			{
				mHasOtherCenter = true;
				mReadyToStart = false;
				Node.Loger.Process(LogType.INFO, "find {0} center", subscriber.Name);
				Protocol.CenterStarted started = new Protocol.CenterStarted();
				started.Name = ID;
				mCenterSubscriber.Publish(subscriber.Name, started);
				Node.Loger.Process(LogType.INFO, "request sync user info ....");
			}
		}

		public INode Node
		{
			get; set;
		}

实现并不复杂,主要是开启相关订阅并注册消息处理方法即可,主要针对注册,同步和获取用户所在网关信息。

网关

网关的作用主要是接收消息,从注册中心获取用户标识对应的网关并把消息推送过去;所以功能也并不复杂主要也是针对INode的操作。

	public interface IGateway : IDisposable
	{
		INode Node { get; set; }

		Protocol.OperationStatus Register(UserToken userToken);

		Protocol.OperationStatus UnRegister(string username);

		void SendMessage(string receivers, string sender, object message);

		void Open();
	}

功能比较简单用户标识注册和注销功能,还加上一个消息推送方法即可。

public OperationStatus Register(UserToken userToken)
		{
			OperationStatus result;
			Register register = new Register();
			register.Name = userToken.Name;
			register.GatewayID = Node.DefaultEventSubscriber.Name;
			result = Node.DefaultSwitchSubscriber.SyncToService<Protocol.OperationStatus>(Center.USER_SERVICE_TAG, register);
			mUserActions[userToken.Name] = userToken;
			return result;
		}

		public void SendMessage(string receivers, string sender, object message)
		{
			MessageQueue.MessageItem item = new MessageQueue.MessageItem();
			item.ID = GetRequestID();
			item.Receives = receivers;
			item.Sender = sender;
			item.Data = message;
			mMsgQueue.Push(item);
			GetUsersInfo getinfo = new GetUsersInfo();
			getinfo.RequestID = item.ID;
			getinfo.Receiver = receivers;
			Node.DefaultSwitchSubscriber.ToService(Center.USER_SERVICE_TAG, getinfo);
		}

		public void Dispose()
		{
			if (mMsgQueue != null)
				mMsgQueue.Dispose();
		}

		public void Open()
		{
			mMsgQueue = new MessageQueue(this, 2);
			mMsgQueue.Open();
			Node.DefaultSwitchSubscriber.DefaultEventSubscriber.Register<GetUserInfoResponse>(OnGetUserInfoRequest);
			Node.DefaultEventSubscriber.Register<UserMessage>(OnUserMessage);
		}

		public OperationStatus UnRegister(string username)
		{
			UnRegister unregister = new UnRegister();
			unregister.Name = username;
			UserToken token = null;
			mUserActions.TryRemove(username, out token);
			return Node.DefaultSwitchSubscriber.SyncToService<OperationStatus>(Center.USER_SERVICE_TAG, unregister);
		}

中心启动

由于基于SmartRoute的设计,所以中心的启动并不需要进行其他配置,直接开启动行即可;对于多节点的中心怎办?如果有需要多启一个实例即可达到多中心负载能力。

	public class Program
	{
		public static void Main(string[] args)
		{
			INode node = NodeFactory.Default;
			node.Loger.Type = LogType.ALL;
			node.AddLogHandler(new SmartRoute.ConsoleLogHandler(LogType.ALL));
			node.Open();
			MRC.MCRFactory.Center.Open();
			System.Threading.Thread.Sleep(-1);
		}
	}

网关应用

网关的启动和中心一样,不过需要根据实际需要发起用户标识注册,注册后就可以向集群中的任何标识发送消息。

    public class Program
    {
        public static void Main(string[] args)
        {
            INode node = NodeFactory.Default;
            node.Loger.Type = LogType.ALL;
            node.AddLogHandler(new SmartRoute.ConsoleLogHandler(LogType.ALL));
            node.Open();
            MRC.MCRFactory.Gateway.Open();
            System.Threading.ThreadPool.QueueUserWorkItem(OnTest);
            System.Threading.Thread.Sleep(-1);
        }

        private static void OnTest(object state)
        {
            System.Threading.Thread.Sleep(10000);
            UserToken token = new UserToken("ken");
            token.Register();
            token.Receive = OnUserReceive;
        }

        private static void OnUserReceive(UserToken token, Protocol.UserMessage e)
        {
            Console.WriteLine("receive message from {0} {1}", e.Sender, e.Data);
        }
    }

构建相应标识的UserToken注册到网关,网关会自动把标识同步到中心;然后定义UserToken相应的消息接收方法即可处理接收的消息。实际应用中可以继承UserToken并挂相应的客户端连接然后当接收消息做相应的网络转发就可以达到用户和用户间的通讯。

由一这样功能并不复杂所以已经封装起方便扩展应用,具体项目地址:https://github.com/IKende/SmartRoute.MRC

时间: 2024-10-19 17:25:27

SmartRoute之大规模消息转发集群实现的相关文章

使用ARM模板在Azure中国大规模部署DCOS集群

容器技术是目前非常流行的技术,尤其是在以Docker作为容器引擎的推动下,让容器的轻量级,可移植,自包含,隔离性等的上了一个新的台阶,目前谈及Dev/Ops,CI/CD很少能够绕过Docker的. Azure在去年就推出了容器服务ACS,以其对开源的全面兼容性,开放性,最全面的编排器(DC/OS, Kubernetes,Swarm)支持而广受好评,但在中国和很多地区,ACS并没有上线,如何在这些地区快速大规模部署容器服务一直是个问题. 而微软更进一步,在11月初,进一步开源了ACS的核心引擎ac

RabbitMQ消息队列集群

RabbitMQ MQ(Message Queue,消息队列)是一款消息中间件,一般以集群方式部署,主要提供消息的接受和发送,实现各微服务之间的消息异步. 集群原理 rabbitmq 是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以

RabbitMQ消息队列集群配置

RabbitMQ是什么? MQ(Message Queue,消息队列)消息中间件,一般以集群方式部署,主要提供消息的接受和发送,实现各微服务之间的消息同步. 原理介绍 rabbitmq是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以

消息队列集群配置

原理介绍 rabbitmq是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证)来实现的,所以部署rabbitmq分布式集群时要先安装erlang,并把其中一个服务的cookie复制到另外的节点 rabbitmq集群中,各个ra

ActiveMQ消息队列集群的搭建

1.准备activemq apache-activemq-5.12.0-bin.tar 2.解压文件 3.并将文件cp一份命名为activemq1 进入conf文件进行修改 修改属性为brokerNamet他的名字可以随意修改,但是需要保证唯一 这里我们配置了动态 组播 将暂时不需 要的属性进行注释,要修改uri路径的端口号为61616,并在最后引用了组播 4.修改配置文件jetty.xml 将里面的port的端口号改为8161 5.接下来把activemq1复制两份,为activemq2,ac

高可用消息队列集群zookeeper+leveldb+activemq

正在上传中---- 预计8月底完成

大规模Elasticsearch集群管理心得

转载:http://elasticsearch.cn/article/110 ElasticSearch目前在互联网公司主要用于两种应用场景,其一是用于构建业务的搜索功能模块且多是垂直领域的搜索,数据量级一般在千万至数十亿这个级别:其二用于大规模数据的实时OLAP,经典的如ELKStack,数据规模可能达到千亿或更多. 这两种场景的数据索引和应用访问模式上差异较大,在硬件选型和集群优化方面侧重点也会有所不同.一般来说后一种场景属于大数据范畴,数据量级和集群规模更大,在管理方面也更有挑战. 应Me

redis集群实现(一)集群架构与初始化

redis是一个高可用.高性能.高可扩展性的基于内存也支持持久化存储的kv存储数据库,redis相比较于之前的kv存储memcached而言,不但支持的value类型大大增加,并且还支持数据的持久化,弥补了memcached的不能持久化的缺点,但是在3.0之前的redis并不支持集群功能,这也是redis在3.0之前不能被大量部署的一个原因,但是由于3.0以后的redis支持了集群功能,redis就开始大量的替代之前的memcached,今天我从源代码层次学习下redis是怎么实现集群功能的.

ActiveMQ笔记(4):搭建Broker集群(cluster)

上一篇介绍了基于Networks of Borkers的2节点HA方案,这一篇继续来折腾Networks of Brokers,当应用规模日渐增长时,2节点的broker可能仍然抗不住访问压力,这时候就需要多加一些broker,弄一个更大规模的Broker集群,但是怎么合理设置broker之间的网络桥接,却是有讲究的,先来看一种不太好的设计: 这个架构看上去没瑕疵,没毛病,3个broker之间两两互通,整体可用性极高,但是从消息的路由角度来看,却不是一个好的设计,当producer向broker