基于zbus网络通讯模块实现的MySQL透明代理(<100行代码)

项目地址 https://git.oschina.net/rushmore/zbus

我们上次讲到zbus网络通讯的核心API:

Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡

IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等

Session -- 代表网络链接,可以读写消息

实际的应用,我们几乎只需要做IoAdaptor的个性化实现就能完成高效的网络通讯服务,今天我们将举例说明如何个性化这个IoAdaptor。

我们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差异。

我们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在我们本地机器上跑起来我们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。

完成大概不到100 行的代码, Cool?Let’s roll!

首先,我们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实非常简单,可以描述为,将来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何做到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。

基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,我们要处理的几个事件应该包括:1)从上游客户端发起的链接请求--代理服务器的Accept事件,2)代理服务器连接目标服务器的Connect事件,3)上下游的数据事件onMessage。

zbus.NET的IoAdaptor提供的个性化事件如下

基本包括一个链接(客户端或者服务端)的生命周期,与消息的编解码。

我们的代理IoAdaptor就是逐一个性化处理。

第一步,编解码: 透明代理对消息内容不做理解,所以不需要编解码。

// 透传不需要编解码,简单返回ByteBuffer数据
	public IoBuffer encode(Object msg) {
		if (msg instanceof IoBuffer) {
			IoBuffer buff = (IoBuffer) msg;
			return buff;
		} else {
			throw new RuntimeException("Message Not Support");
		}
	}

	// 透传不需要编解码,简单返回ByteBuffer数据
	public Object decode(IoBuffer buff) {
		if (buff.remaining() > 0) {
			byte[] data = new byte[buff.remaining()];
			buff.readBytes(data);
			return IoBuffer.wrap(data);
		} else {
			return null;
		}
	}

第二步,代理服务接入:

@Override
	protected void onSessionAccepted(Session sess) throws IOException {
		Session target = null;
		Dispatcher dispatcher = sess.getDispatcher();
		try {
			target = dispatcher.createClientSession(targetAddress, this);
		} catch (Exception e) {
			sess.asyncClose();
			return;
		}
		sess.chain = target;
		target.chain = sess;
		dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
	}

这里的逻辑思路是,代理服务器每接受到一个请求--通过onSessionAccepted表达,我们将同时创建一个到目标服务器的链接,今天的例子是目标MySQL服务器,注意上面的处理中把创建目标服务器Session过程与真正链接到目标服务分开(Dispatcher也提供合并二者的工具方法),是为了能在没有发生链接之前绑定上好上下游关系,通过Session的chain变量来表达,也就是当前Session的关联Session,关联好之后启动感兴趣Connect事件,逻辑处理完毕。

第三步,链接成功事件(第二步中需要链接到目标服务器)

@Override
	public void onSessionConnected(Session sess) throws IOException {  
		Session chain = sess.chain;
		if(chain == null){ 
			sess.asyncClose();
			return; 
		}   
		if(sess.isActive() && chain.isActive()){ 
			sess.register(SelectionKey.OP_READ);
			chain.register(SelectionKey.OP_READ);
		}
	}

这里的一个核心是当上下游都处于链接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为什么在这里做的原因是一定要等上下游都正常态后才启动双方消息处理,不然会出现字节丢失。

第四步,处理上下游数据事件

@Override
	protected void onMessage(Object msg, Session sess) throws IOException {  
		Session chain = sess.chain;
		if(chain == null){
			sess.asyncClose(); 
			return;
		} 
		chain.write(msg); 
	}

是不是非常简单,类似pipeline,从一端的数据写到另外一端。

原则上面4步结束,整个透明代理就完成了,但是为了处理链接异常清理,我们增加了Session清理处理,如下

@Override
	public void onSessionToDestroy(Session sess) throws IOException {   
		try {
			sess.close();
		} catch (IOException e) { //ignore
		} 
		if (sess.chain == null) return; 
		try {
			sess.chain.close();
			sess.chain.chain = null;
			sess.chain = null;
		} catch (IOException e) { 
		}
	}

工作就是解决上下游链接清理链接。

至此为止我们的IoAdaptor个性化就完成了,是不是非常简单,现在我们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。

	public static void main(String[] args) throws Exception {   
		Dispatcher dispatcher = new Dispatcher(); 
		IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); 
		final Server server = new Server(dispatcher, ioAdaptor, 3306); 
		server.start();
	}

骚年,包括渣渣import和少许注释加起来折腾了不到100行,该跑一跑了,还是那句话,不是HelloWorld,你可以规模压力测。看看你是否在本地代理出来了你的目标服务MySQL,gl,hf, gogogo.

完整代码可运行代码如下,也可直接到zbus示例代码库中找到

https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4

package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;
import org.zbus.proxy.TcpProxyServer;

public class TcpProxyAdaptor extends IoAdaptor {
	private String targetAddress;

	public TcpProxyAdaptor(String targetAddress) {
		this.targetAddress = targetAddress;
	}

	// 透传不需要编解码,简单返回ByteBuffer数据
	public IoBuffer encode(Object msg) {
		if (msg instanceof IoBuffer) {
			IoBuffer buff = (IoBuffer) msg;
			return buff;
		} else {
			throw new RuntimeException("Message Not Support");
		}
	}

	// 透传不需要编解码,简单返回ByteBuffer数据
	public Object decode(IoBuffer buff) {
		if (buff.remaining() > 0) {
			byte[] data = new byte[buff.remaining()];
			buff.readBytes(data);
			return IoBuffer.wrap(data);
		} else {
			return null;
		}
	}

	@Override
	protected void onSessionAccepted(Session sess) throws IOException {
		Session target = null;
		Dispatcher dispatcher = sess.getDispatcher();
		try {
			target = dispatcher.createClientSession(targetAddress, this);
		} catch (Exception e) {
			sess.asyncClose();
			return;
		}
		sess.chain = target;
		target.chain = sess;
		dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
	}

	@Override
	public void onSessionConnected(Session sess) throws IOException {  
		Session chain = sess.chain;
		if(chain == null){ 
			sess.asyncClose();
			return; 
		}   
		if(sess.isActive() && chain.isActive()){ 
			sess.register(SelectionKey.OP_READ);
			chain.register(SelectionKey.OP_READ);
		}
	}

	@Override
	protected void onMessage(Object msg, Session sess) throws IOException {  
		Session chain = sess.chain;
		if(chain == null){
			sess.asyncClose(); 
			return;
		} 
		chain.write(msg); 
	}

	@Override
	public void onSessionToDestroy(Session sess) throws IOException {   
		try {
			sess.close();
		} catch (IOException e) { //ignore
		} 
		if (sess.chain == null) return; 
		try {
			sess.chain.close();
			sess.chain.chain = null;
			sess.chain = null;
		} catch (IOException e) { 
		}
	}

	@SuppressWarnings("resource")
	public static void main(String[] args) throws Exception {   
		Dispatcher dispatcher = new Dispatcher(); 
		IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); 
		final Server server = new Server(dispatcher, ioAdaptor, 3306);
		server.setServerName("TcpProxyServer");
		server.start();
	}
}
时间: 2024-10-27 13:23:19

基于zbus网络通讯模块实现的MySQL透明代理(<100行代码)的相关文章

100行代码实现最简单的基于FFMPEG+SDL的视频播放器(SDL1.x)【转】

转自:http://blog.csdn.net/leixiaohua1020/article/details/8652605 版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[-] 简介 流程图 simplest_ffmpeg_player标准版代码 simplest_ffmpeg_player_suSU版代码 结果 FFMPEG相关学习资料 补充问题 ===================================================== 最简单的基于FFmp

用JavaCV改写“100行代码实现最简单的基于FFMPEG+SDL的视频播放器 ”

FFMPEG的文档少,JavaCV的文档就更少了.从网上找到这篇100行代码实现最简单的基于FFMPEG+SDL的视频播放器.地址是http://blog.csdn.net/leixiaohua1020/article/details/8652605. 用JavaCV重新实现并使用opencv_highgui进行显示. 1 import com.googlecode.javacpp.IntPointer; 2 import com.googlecode.javacpp.Pointer; 3 im

基于ffmpeg网络播放器的教程与总结

基于ffmpeg网络播放器的教程与总结 一.         概述 为了解决在线无广告播放youku网上的视频.(youku把每个视频切换成若干个小视频). 视频资源解析可以从www.flvcd.com获取,此网站根据你输入的优酷的播放网页地址解析成若干个真实的视频地址. 二.         实现 首先搜索关闭网络播放器(流媒体播放器的实现方法) 得出的结论,目前主流的播放器分三大阵营微软,苹果,基于FFmpeg内核的.所以我决定从ffmpeg开源的播放器入手. 最出名的ffmpeg播放器vc

postrouting和prerouting的区别以及透明代理

一.源 在学习鸟哥的linux私房菜NAT部分的时候为了加深对postrouting和prerouting的理解,上网查了相关的一些资料.有一篇博文写的很好,给了我很多的启示:/jis1237/article/details/17676659(csdn),下面谈谈自己的所得. 二.关键词链接 访问外网---发一个数据报---路由规则审核一哈---snat( 源地址转换)---postrouting(路由规则之后的动作)---从内网出 外网访问内网---发送一个数据报---dnat (目的地址转换

基于opencv网络摄像头在ubuntu下的视频获取

 基于opencv网络摄像头在ubuntu下的视频获取 1  工具 原料 平台 :UBUNTU12.04 安装库  Opencv-2.3 2  安装编译运行步骤 安装编译opencv-2.3  参考http://blog.csdn.net/xiabodan/article/details/23547847 3  测试代码 编译 g++ cameraCaptrue.cpp -o test `pkg-config --libs --cflags opencv` cameraCaptrue.cpp

基于corosync和pacemaker实现HA mysql service

corosync基础介绍 corosync AIS: Application Interface Standard,应用程序接口规范 SA Forum:服务可用性论坛,OpenAIS即是此论坛开发 OpenAIS 提供了一种集群模式,包含集群框架.集群成员管理.通信方式.集群监测等,但没有提供集群资源管理功能: 常用组件包括:AMF,CLM,CPKT,EVT等接口,分支不同,包含的组件略有区别: 目前的三大主要分支包括:picacho,whitetank,wilson:OpenAIS在由whit

PHP基于HTTPD模块的方式跟MYSQL连接

实验环境: 1.VMware 2.两台linux子机 3.桥接,本机当客户机 实验目的: 1.编译HTTPD和编译PHP.编译MYSQL,熟悉编译过程和原理 2.HTTPD和PHP在同一台服务器上,PHP作为HTTPD的模块来与MYSQL建立连接 实验拓扑: 二进制格式安装mysql 1.安装包组和依赖所用到的包 #yum install pcre-devel #yum groupinstall 'development tools' #yum groupinstall 'desktop pla

sshuttle基于VPN的透明代理,安全连接

sshuttle基于VPN的透明代理, 通过 ssh 创建一条从你电脑连接到任何远程服务器的 VPN 连 sudo sshuttle -r [email protected] 0.0.0.0/0 -vv 开始后,sshuttle会创建一个ssh会话到由-r指定的服务器.如果-r被丢了,它会在本地运行客户端和服务端,这个有时会在测试时有用. 连接到远程服务器后,sshuttle会上传它的(python)源码到远程服务器并执行.所以,你就不需要在远程服务器上安装sshuttle,并且客户端和服务器端

Raknet是一个基于UDP网络传输协议的C++网络库(还有一些其它库,比如nanomsg,fastsocket等等)

Raknet是一个基于UDP网络传输协议的C++网络库,允许程序员在他们自己的程序中实现高效的网络传输服务.通常情况下用于游戏,但也可以用于其它项目. Raknet有以下好处: 高性能 在同一台计算机上,Radnet可以实现在两个程序之间每秒传输25,000条信息: 容易使用 Raknet有在线用户手册,视频教程.每一个函数和类都有详细的讲解,每一个功能都有自己的例程 跨平台,当前Raknet支持Windows, Linux, Macs,可以建立在Visual Studio, GCC, Code