【MINA】用mina做业务服之间的通信,实现业务负载均衡思路

学习mina目的还是搭建通信架构,学完mina我们了解了如何实现客户端和服务端,也就是一个正常channel我们是知道怎么建立的

但是问题是,我们应用环境通信分为两种

1.前后端通信



其实这个比较好实现,提供一个mina server端,供前端语言通过socket建连接就行,这个通信就算是ok了,编解码等通信解析的细节这里不讲了

以前的游戏服务端架构业务多用短连接,聊天用长连接,聊天的部分其实就是上面表述的情况

现在是长连接的天下,聊天依旧是长连接,业务也做成长连接,实现了真正意义上的长连接游戏架构,这其实就表述了一种当下典型架构,

就是后端提供两个开放的通信端口【即两个mina server】,供前端的socket连接,一个负责聊天,登录,注册,另一个负责其他业务,这样就实现了协议通信的负载均衡

2.后端的业务服通信【这是本文的重点】



那么后端的业务就不需要负载均衡吗?比如job,异步更新db,活动副本等

当然也是需要的,怎么做那,先拿1中的做个解释

mainserevr[聊天,登录,注册]---nodeserver[其他业务]

这两个mina sever端已经建立起来了,但是两个server之间还不能通信,我们有两个选择,要么在mainserevr上起个mina client去连nodeserver,要么在nodeserver

上起个mina client去连mainserevr,思路肯定是这样的,一旦这个通道建立了,其实互为server和client的,会有一个iosession被通道持有,只要有这个iosession,

就可以主动write,当然对于通道的另一端可以response,也可以通过取得iosession来主动写

实现方式,我们在nodeserevr上提供一个mainserverClient这样一个spring的bean去连接mainserver,这样在nodeserver上就可以向mainserevr发消息了

3.带着这个思路设计一下



我把游戏中的业务分为

     public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node
	public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server
	public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server
	public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 异步DB
	public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活动
	public static final String SERVER_TYPE_OTHER_STR = "other";// 其他
	public static final String SERVER_TYPE_GM_STR = "GM";//管理端

每次启动一种server时,首先启动一次mina serevr,然后启动多个mina client去连接其他的mina server,

比如启动nodeserevr 服务端,然后启动多个client分别连接mainserevr,jobserevr等的服务端,这样我就可以

在nodeserver上给其他业务serevr发请求了,具体启动哪些client看需要

搞一个启动server类型的方法

public static ClassPathXmlApplicationContext start(String serverTypeStr) {
		try {                        //关闭连接池的钩子线程
			ProxoolFacade.disableShutdownHook();                        //spring 的核心配置文件
			String xmlFile = "applicationContext.xml";

			....
			log.info("启动 {} server................", serverTypeName);

			// 设置到系统环境变量
			System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");
			System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,
					serverTypeName);

			// final ClassPathXmlApplicationContext parent = new
			// ClassPathXmlApplicationContext(
			// xmlFile);
			String fileName = null;
              //这是把spring的住配置文件拆分了一部分内容出来,目前是只加载本server需要的bean
			if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {
				fileName = "wolf/app_nodeserver.xml";
			} else {
				fileName = "wolf/app_server.xml";
			}

			//手动启动spring
			final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
					new String[] { xmlFile, fileName });

			if (context != null) {
				ServiceLocator.getInstance().setApplicationContext(context);
			}

			// 启动socket server
			final WolfServer server = (WolfServer) ServiceLocator
					.getSpringBean("wolf_server");
			server.setServerType(serverType);                        //这个调用就是我们熟悉的启动mina server端
			server.start();

			//这个动用做两件事,选区需要的serevr类型建立mina client连接
			startClient(server);
                        //钩子线程用来监听应用停止,为了做停止时的后续处理
			Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
				public void run() {
					_shutdown();
				}
			}, "shutdownHookThread"));
              //为了支持web,springMVC,内置一个web server
			if (NodeSessionMgr.SERVER_TYPE_MAIN_STR
					.equalsIgnoreCase(serverTypeStr)) {
				JettyServer jettyServer = (JettyServer) ServiceLocator
						.getSpringBean("jettyServer");
				jettyServer.start();
			}

			log.info("start {} end................", serverTypeName);
			return context;

		} catch (Exception e) {
			e.printStackTrace();
			shutdown();
		} finally {

		}
		return null;
	}

在看下startClient(server);

private static void startClient(WolfServer server) {
		// asyncdbServer只会被连接,不会主动连接其他server                // 这部分目的是过滤那些不需要主动连比人的serevr,比武我这里的异步db,和活动服
		if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB
				|| server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {
			return;
		}

		// 发送game Server ip port到mainserver
		Map<String, Object> params = new HashMap<String, Object>();
		params.put("nodeServerIp", server.getIp());
		params.put("nodeServerPort", server.getPort());
		params.put("serverType", server.getServerType());

		//我需要mainserevr的client,就弄个bean在本服
		final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator
				.getSpringBean("mainServerClient");

		//这个位置其实就是mina的client连server端
		mainServerClient.init();
		Object localAddress = mainServerClient.registerNode(params);

         //同上,需要jobserevr的client
		final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator
				.getSpringBean("jobServerClient");
		if (jobServerClient != null) {
			jobServerClient.init();
			Map<String, Object> params1 = new HashMap<String, Object>();
			params1.putAll(params);
			jobServerClient.registerNode(params1);
		}
		// }

		.....

	}

 

再看下WolfClientService.init()

public void init() {
		if (start)
			return;
		if (wolfClient == null) {
			log.error("wolf client is null");
			return;
		}
         //mina 的client 连接 mina server
		wolfClient.start();
		if (wolfClient.isConnected())
			start = true;
	}

再看下wolfclient.start()

/**
	 * 连接一个服务器,并指定处理接收到的消息的处理方法
	 *
	 */
	public void start() {
		// this.context.put("resultMgr", this.resultMgr);

		logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
				.getString("WolfClient_9"), processorNum);
		logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
				.getString("WolfClient_0"), corePoolSize);
		logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
				.getString("WolfClient_4"), maxPoolSize);

		if (this.serverIp == null || this.serverIp.equals("")) {
			logger.error(clientName + "没有配置serverIp,不启动.........");
			return;
		}
		String threadPrefix = clientName + "[" + this.serverIp + ":"
				+ this.serverPort + "]";
		// exector = Executors.newCachedThreadPool(new
		// NamingThreadFactory(threadPrefix));
		processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,
				processorNum);

		// connector = new NioSocketConnector((Executor) exector, processor);
		connector = new NioSocketConnector(processor);

		// connector.getSessionConfig().setReuseAddress(true);
		DefaultIoFilterChainBuilder chain = connector.getFilterChain();

		if (useLogFilter == 2) {
			chain.addLast("logging", new LoggingFilter());
		}
		// codec filter要放在ExecutorFilter前,因为读写同一个socket connection的socket
		// buf不能并发(事实上主要是读,写操作mina已经封装成一个write Queue)
		chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 设置编码过滤器

		// 添加心跳过滤器,客户端只接受服务端的心跳请求,不发送心跳请求
		// connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);
		// 这里的KeepAliveFilter必须在codec之后,因为KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,则可以在codec之前
		// KeepAliveFilter到底在ExecutorFilter之前好还是之后好,我也不确定
		KeepAliveFilter filter = new KeepAliveFilter(
				new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),
				IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),
				keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,
				30);
		chain.addLast("ping", filter);

		// 添加执行线程池
		executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,
				keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(
						threadPrefix));

		// 这里是预先启动corePoolSize个处理线程
		executor.prestartAllCoreThreads();

		chain.addLast("exec", new ExecutorFilter(executor,
				IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
				IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
				IoEventType.SESSION_OPENED));

		if (useWriteThreadPool) {
			executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,
					maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
					new NamingThreadFactory(threadPrefix + "write"));
			executorWrite.prestartAllCoreThreads();
			chain.addLast("execWrite", new ExecutorFilter(executorWrite,
					IoEventType.WRITE, IoEventType.MESSAGE_SENT));

		}
		// ,logger.isDebugEnabled() ? new
		// LoggingIoEventQueueHandler("execWrite") : nulls

		// 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log
		// 可以配置在ExecutorFilter之后:是为了在工作线程中打印log,不是在NioProcessor中打印
		if (useLogFilter == 1) {
			chain.addLast("logging", new LoggingFilter());
		}

		connector.setHandler(handler);

		connector.getSessionConfig().setReuseAddress(true);
		connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);
		logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
				.getString("WolfClient_1")
				+ serverIp + ":" + serverPort);
		ConnectFuture cf = null;

		long start = System.currentTimeMillis();
		while (true) {
                        //这地很关键,是个无线循环,每10秒连接一次,直到可以和服务端建立连接,否则一支循环下去
			cf = connector.connect(serverAddress);// 建立连接
			cf.awaitUninterruptibly(10000L);
			if (!cf.isConnected()) {
				if ((System.currentTimeMillis() - start) > timeout) {
					throw new RuntimeException(
							com.youxigu.dynasty2.i18n.MarkupMessages
									.getString("WolfClient_5")
									+ serverIp + ":" + serverPort);
				}
				if (cf.getException() != null) {
					logger.error(com.youxigu.dynasty2.i18n.MarkupMessages
							.getString("WolfClient_6"), serverIp + ":"
							+ serverPort, cf.getException().getMessage());
				}
				try {
					Thread.sleep(10000);
				} catch (Exception e) {
				}

				continue;
			}
                        //这就是终极目标了,我们的目的就是在serevr的客户端的bean里,可以拿到这个iosession
			this.setSession(cf.getSession());

			logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
					.getString("WolfClient_10")
					+ serverIp + ":" + serverPort);
			shutDown = false;
			if (handler instanceof WolfMessageChain) {
				WolfMessageChain wmc = WolfMessageChain.class.cast(handler);
				wmc.init(context);
			}

			break;
		}

	}

这样后端的业务通信网就可以轻松的建立起来,之后想怎么通信就看你的了  

时间: 2024-10-22 19:21:39

【MINA】用mina做业务服之间的通信,实现业务负载均衡思路的相关文章

nginx利用geo模块做限速白名单以及geo实现全局负载均衡的操作记录

geo指令使用ngx_http_geo_module模块提供的.默认情况下,nginx有加载这个模块,除非人为的 --without-http_geo_module.ngx_http_geo_module模块可以用来创建变量,其值依赖于客户端IP地址.geo指令语法: geo [$address] $variable { ... }默认值: -配置段: http定义从指定的变量获取客户端的IP地址.默认情况下,nginx从$remote_addr变量取得客户端IP地址,但也可以从其他变量获得.例

拼多多怎么开店,拼多多店群何如做好客服

店群管家大讲堂-拼多多店群何如做好客服,上节课我们介绍了拼多多店群是什么,一个人开多个拼多多店铺,高效操作上货和一键派单.那么我们来聊一聊拼多多店群如何做好客服.拼多多店群采集上家宝贝,让上家发货.客户问题汇总:1.从哪发的货回答:我们仓库全国有参加,我们是就近发货.2.你们快递多久能到回答:两三天(不要给他一个准话)3.用什么快递,能不能用韵达.EMS回答:四通一达,EMS我们都是可以发,但是我们不能自选,我们会根据亲的地区来发货4.退货沟通是退货,还是说给他补差价,我们要及时跟商家沟通查看售

Android - Fragment (三)不同Fragment之间的通信

在Fragment的java文件中,可以使用getActivity()来获得调用它的activity, 然后再找到另一个Fragment,进行通信 getActivity().getFragmentManager().findFragmentById(R.id.fragment_list); 但这样做耦合度太高,不方便后续的修改操作 Fragment与其附着的Activity之间的通信,都应该由Activity来完成 不能是多个Fragment之间直接通信 Fragment与其附着的Activi

跨园区容灾,升级不停服——高可用负载均衡集群实践

对于云计算行业来说,云服务的可用性和可扩展性是的检测其服务质量的重要标准,也是最受用户关注的两大难题.各云计算厂商针对容灾.升级等需求的解决方案,最能够体现其底层架构的实力.腾讯云基于基础架构的优势,为分期乐.微信红包等平台提供技术支持,可以完美满足如下三点需求: 1. 高可用能力,容灾能力强,升级不停服 2. 可扩展性强,功能丰富,性能超高 3. 避免重复造轮子,性价比之王 近期,针对一些客户对腾讯云产品可用性的问询,腾讯云基础产品团队对负载均衡产品的原理做出详细阐述,并希望通过对腾讯负载均衡

怎么用pfSense为你的web服务做负载均衡

注意pfSense的负载均衡有两种:一是设置多个WAN做双线负载均衡,二是本文的为LAN内的web服务器做inbound-loadbalancer. 这篇文章中展示了怎么使用pfSense 2.0 为你的多个web服务器配置负载均衡(load balancer).这里假定在你的网络环境中已经拥有了一个pfSense服务器和2个以上的apache服务器,并且具有一定的pfSense知识. 1.前提 一个安装好的pfSense 2.0 机器(如果它是你的外围防火墙,建议安装在物理机上) 至少2个ap

网络编程之Socket的TCP协议实现客户端与客户端之间的通信

我认为当你学完某个知识点后,最好是做一个实实在在的小案例.这样才能更好对知识的运用与掌握 如果你看了我前两篇关于socket通信原理的入门文章.我相信对于做出我这个小案列是完全没有问题的!! 既然是小案列.就难免会有很多不足,所以还请大家见谅.先说下用此案例实现的功能 利用Socke的TCP协议实现了 1:端与端之间的通信(客户端和客户端)包括发送文字,小文件,抖动效果 2:同步实现好友下线与上线的效果,即有其他客户端上线会通知其他已经在线的客户端 3:实现自定义通信协议 服务器窗体布局 布局很

跨园区容灾,升级不停服:高可用负载均衡集群实践

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由腾讯云中间件团队发表于云+技术周刊特别版 作者:方坤丁 对于云计算行业来说,云服务的可用性和可扩展性是的检测其服务质量的重要标准,也是最受用户关注的两大难题.各云计算厂商针对容灾.升级等需求的解决方案,最能够体现其底层架构的实力.腾讯云基于基础架构的优势,为分期乐.微信红包等平台提供技术支持,可以完美满足如下三点需求: 1. 高可用能力,容灾能力强,升级不停服 2. 可扩展性强,功能丰富,性能超高 3. 避免重复造轮子,性价比之王

做了「负载均衡」就可以随便加机器了吗?这三招来帮你!

本文长度为3056字,预计读完需1.1MB流量,建议阅读8分钟. 阅读目录 什么是Session丢失? nginx是如何来解决这个问题的? Session保持的其它方案 结语 这篇是<分布式关注点系列>中「负载均衡」相关的内容最后一发了,后续也会继续讲「高可用」相关的其它主题,主要是限流.降级.熔断之类的吧,具体还没定.文末先附上之前发过的高可用相关文章,供你再温故一下. 下面这个场景不知是否在你面前出现过. 开发Z哥对运维Y弟喊:“Y弟,现在系统好卡,刚上了一波活动,赶紧帮我加几台机器上去顶

Windows平台下利用APM来做负载均衡方案 - 负载均衡(下)

概述 我们在上一篇Windows平台分布式架构实践 - 负载均衡中讨论了Windows平台下通过NLB(Network Load Balancer) 来实现网站的负载均衡,并且通过压力测试演示了它的效果,可以说还是非常的理想的.同时我们也收集到了不少的问题,比如说如何在这种分布式的架构下使用Session,NLB中有一台服务器挂掉了会导致对外暴露的地址无法访问,如果实现服务器之间的同步,如果更好的进行热修复等等,还有我们在上一篇中也提到了NLB所提供的功能是非常简单的,为了回答我们前面提到的问题