接触nio也有好几年了,最开始摸不着头脑,资料很少,网上的资料都是翻来覆去的抄袭,基本上对于自己的学习没有大帮助。
后来遇到了xsocket,觉得用起来还可以,挺方便,api也很简洁。不过这个库的用户不多,后来作者也停止了开发。
然后就是mina netty,这些不用说,了解nio的人都知道它们。不过想用好它们也不是那么容易。里面有各种回调,各种future,当你在处理网络事件的时候,如果想在发起一个连接,那回调代码就已经成翔。并非批评这种方式不好,只不过它不符合我这样的懒人、笨人使用。因为,就是做个接个连接、发起个连接而已,越直观越好。
于是就想找一种解决办法,改变当前的不爽。最初的努力方向,是通过锁、等待通知事件的机制进行封装,这样在一定程度上可以解决一部分问题,不过我设定的环境是处理网络事件比较频繁的环境,所以,锁的开销会潜在的比较高,也会显著的降低吞吐量。最重要的因素是:复杂度,应用层的复杂度得到一定程度的环境,不过底层代码的编写复制度还是很高,回调很多,很难直观的看到代码的全貌、流程,所以经过了很长一段的时间的尝试之后,决定换个角度看看。
于是重新审视我在处理的事情,其实就是事件的处理,事件的处理常用方法就是消息机制,发送消息,比如qt mfc等等,都是用消息机制。觉得这样的方式不是我想要的。我想要一种类似线程的东西,但是挂起和恢复可以由自己来控制,而且开销要足够低。我知识面不算很宽,所以,当时还不知道这东西应该是什么,于是就google一把,这时协程(coroutine,fiber)进入了我的视线,协程的定义、以及用户态调度的方式,都符合要求,潜意思中感到:找到合适办法。
于是就看了很多协程相关的资料、文章。并了解了协程的现有实现:go有,c#有,lua有,boost有,dlang有,java有kilim,linux有ucontext。反正有很多。最先选择了看kilim,看了一段时间觉得很晕乎,于是换,看boost,还是是很晕乎,没看懂它是怎么实现的。这时,正在学dlang,dlang里有个vibe.d的库,这是一个庞大的库,http tcp udp的封装都有,而且它也是开源的,于是,就先用它写了一段时间的程序,不知道我姿势不对,还是它的库不稳定,反正是出各种bug,我也就不停的调试。用了几个月,里面的大致概念基本懂了,但是它还是很容易出错,程序很容易崩溃。于是,就用epoll和dlang本身的fiber封装了一个新的库,这里重要的改进是加入了空指针的检测,让程序不再轻易崩溃。在此过程中,学会了基本的协程调度以及和epoll网络事件的处理,以及两者的结合方式。
而实际工作中,使用的还是java,用了kilim,也用了netty,觉得这种结合方式很不方便,事件需要在两个线程间不停传递。于是又翻看了kilim的源码,除了注入部分,其余的大致能看懂,而我最关心的其实是调度部分,kilim的调度使用的是线程间的调度方式,使用了很多锁,隐隐觉得不妥,我需要的是在单线程情况下能高效运行的方法。而且传统的epoll程序正是这样,所以,只需要把我在dlang里做的工作搬到这里即可。
说干就干,平时早上6点起床,写到8点,周末连续写两天。经过两个周末,一周多的时间,我终于把kilim和nio结合到一起了,并写了一些常用的工具,比如协程间的协调(等待、通知),等等。我给它取名:dawn。
现在使用nio的方式就很符合我的预期,写起来很直观,代码是顺序写的,而不是回调:
看个例子:
Echo Server,这种server的功能很简单,就是把服务收到的所有数据发回客户端。那看在dawn里怎么写:
首先,所有协程都是在调度器中运行的,要创建一个调度器
Scheduler sch = new Scheduler();
其次,要创建一个协程,来加载我们的程序:
new Task() {
@Override
public void execute() throws Pausable, Exception {
System.out.println("boostrap..");
TcpServer server = new TcpServer("0.0.0.0", 10000) {
@Override
protected void onAccepted(TcpChannel ch) {
processConnection(ch);
}
};
server.start();
}
}.startOn(sch); //因为执行这行代码的时候 ,还处于调度器线程之外,所以,必须指定在哪个调度器上执行。
这里创建了一个Task即协程,并让他在sch上运行。
execute方法体里,就是初始化代码,在所有地址、端口10000上创建了一个tcp服务器,并在接收到连接的时候调用processConnection方法。
接下来我们来看这个方法
static void processConnection(final TcpChannel ch) {
new Task() {
@Override
public void execute() throws Pausable, Exception {
System.out.println("reading.");
ScalableDirectBuf buf = ScalableDirectBuf.allocateFromTlsCache(); //分配一个buffer
int n = 0;
while (true) {
n = ch.readSome(buf); //读数据,这里会阻塞,直到读到数据,或者抛出网络异常
if (n > 0) {
ch.writeAll(buf); //写数据,这里也会阻塞,直到写完所有数据。
}
count++;
buf.compact(); //压缩buffer,抛弃已经消耗的数据,并释放出空间,未消耗掉的数据,还会继续留在buffer中。
}
}
}.start();//此时,代码已经在调度器线程执行,所以不需要指定调度器,默认使用父协程所在的调度器。
}
最后,启动调度器,在启动调度器之前,所有协程的execute方法都不会执行。
sch.start();
最后,我们可以用telnet 127.0.0.1 10000连上这个服务器,并发送数据,会把你发送的数据回显在telnet客户端上。
好吧,先写到这把,歇会。更多信息可以到github上去看:
https://github.com/zhmt/dawn