Java IO:SocketChannel和Selector在ZooKeeper中应用

转载请注明出处:jiq?钦‘s technical Blog

假设不了解SocketChannel和Selector。请先阅读我的还有一篇博文:点击打开链接

ZooKeeper的启动从QuorumPeerMain类的main函数開始:

调用顺序是: Main -> initializeAndRun-> runFromConfig

一、默认的NIOServerCnxnFactory通信方式

当中runFromConfig主要做了两件事情:

(1)初始化client与服务端的网络通信处理类ServerCnxnFactory:

ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();

而createFactory函数的内部实现是:

if (serverCnxnFactoryName == null) {
 serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}

try {
 return(ServerCnxnFactory) Class.forName(serverCnxnFactoryName).newInstance();
} catch (Exception e) {
            IOException ioe = new IOException("Couldn‘t instantiate "
                    + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
}

能够看到默认初始化的ServerCnxnFactory是NIOServerCnxnFactory,这是Java NIO方式的网络通信,另外还有NettyServerCnxnFactory类提供Netty通信方式。

(2)启动QuorumPeer:

quorumPeer.start();

内部会调用初始化好的ServerCnxnFactory类的start函数:

cnxnFactory.start();

二、NIOServerCnxnFactory.start()方法

@Override
   publicvoidstart() {
        stopped = false;
        if (workerPool == null){
            workerPool = new WorkerService(
                "NIOWorker", numWorkerThreads, false);
        }

        //先启动一堆selector线程
        for(SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }

        //再启动accept线程
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }

        //最后启动expire线程
        if (expirerThread.getState() ==Thread.State.NEW){
            expirerThread.start();
        }
    }

三、NIOServerCnxnFactory中几个线程类的关系

NIOServerCnxnFactory中包括了三个类:

(1)AbstractSelectThread:SelectorThread类和AcceptThread类的共同父类,维护了一个selector选择器对象

(2)AcceptThread:管理新的ZooKeeper client连接请求,实际上就是利用父类的选择器selector监听ServerSocketChannel的“SelectionKey.OP_ACCEPT”事件,一旦来新的请求,负责建立好和client的连接SocketChannel,并从SelectorThread线程池分配一个线程。将该SocketChannel连接放入SelectorThread线程维护的acceptedQueue队列中。

(3)SelectorThread:监听AcceptThread分配的已经建立的SocketChannel连接上发生的数据读写事件,并运行实际数据读写。

实际上就是利用父类的选择器selector监听在acceptedQueue队列中建立好的连接的数据读写事件。一旦读写事件发生,调用handleIO函数处理读写请求。

以下是读写事件监听线程的线程池selectorThreads的初始化:

for(inti=0; i<numSelectorThreads;++i) {
            selectorThreads.add(newSelectorThread(i));
       }

以下是新连接管理线程acceptThread的初始化:

this.ss =ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread= newAcceptThread(ss, addr, selectorThreads);

然后NIOServerCnxnFactory.start()将会启动这些线程。

AcceptThread的run函数非常easy。就是监听连接SelectionKey.op_accept事件,然后建立SocketChannel连接。并分配一个SelectorThread线程来处理该连接,详细就不详述了。

SelectorThread的run函数核心就是监听到SelectionKey.OP_READ事件后运行handleIO函数,详细怎样和client进行数据交互计划放在其它文章中介绍。

时间: 2024-11-02 11:38:26

Java IO:SocketChannel和Selector在ZooKeeper中应用的相关文章

hbase异常:java.io.IOException: Unable to determine ZooKeeper ensemble

项目中用到hbase,有时候可能会报一些异常,比如java.io.IOException: Unable to determine ZooKeeper ensemble 等等,当出现这个问题时,某某说是项目中用到线程池的问题导致的,但查看异常之后,并非跟啥线程池有关系,异常信息如下: java.io.IOException: Unable to determine ZooKeeper ensemble at org.apache.hadoop.hbase.zookeeper.ZKUtil.con

Java IO流(第三讲):字节流中的FileInputStream与FileoutputStream

一.概念 FileInputStream和FileOutputStream 是一对继承与InputStream和OutputStream的类,用于本地文件读写,按二进制格式读写并且顺序读写,读和写的文件流要区分开,即分别创建不同文件流对象. 二.记住in和out 死记硬背型: 不管你从磁盘读,从网络读,或者从键盘读,读到内存,就是InputStream. 不管你写入磁盘,写入网络,或者写到屏幕,都是OuputStream. 理解型: 有些人经常遇到InputStream.OuputStream,

Java IO 理论笔记

1.Java IO 流 io是java中实现输入输出的基础,它可以很方便的完成数据的输入输出操作,Java把不同的输入输出抽象为流,通过流的方式允许Java程序使用相同的方式来访问不同的输入.输出. 2.流的分类 输入流.输出流  A.输入流:只能从中读取数据,而不能向里面写数据 B. 输出流:只能向里面写数据,而不能读数据 可以这样理解,数据从内存到硬盘,通常认为是输出流,即写操作:相反,从硬盘到内存,通常认为是输入流,即读操作:这里的输入.输出是从内存的角度划分的. Java的输入流主要有I

JDK1.8 java.io.Serializable接口详解

java.io.Serializable接口是一个标志性接口,在接口内部没有定义任何属性与方法.只是用于标识此接口的实现类可以被序列化与反序列化.但是它的奥秘并非像它表现的这样简单.现在从以下几个问题入手来考虑. 希望对象的某些属性不参与序列化应该怎么处理? 对象序列化之后,如果类的属性发生了增减那么反序列化时会有什么影响呢? 如果父类没有实现java.io.Serializable接口,子类实现了此接口,那么父类中的属性能被序列化吗? serialVersionUID属性是做什么用的?必须申明

hive启动时报错${system:java.io.tmpdir

Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D这是因为在hive-site.xml配置文件中需要配置system:java.io.tmpdir属性. 在配置文件中加入: <proper

hive启动时报错: Relative path in absolute URI: ${system:java.io.t

Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D这是因为在hive-site.xml配置文件中需要配置system:java.io.tmpdir属性. 在配置文件中加入: <proper

File java IO

1.File简介 /** * java.io.File * File用于表示文件系统中的一个文件或目录的 * * 使用File我们可以: * 1:访问其表示的文件或目录的属性(名字,大小等) * 2:创建,删除文件或目录 * 3:访问一个目录中的子项 * * 但是不能访问文件数据. */ public class FileDemo { public static void main(String[] args) { /* * 创建File是要指定路径 * 路径有两种:绝对路径和相对路径 * 绝对

HBase中此类异常解决记录org.apache.hadoop.ipc.RemoteException(java.io.IOException):

ERROR: Can't get master address from ZooKeeper; znode data == null   一定注意这只是问题的第一层表象,真的问题是: File /hbase/.tmp/hbase.version could only be replicated to 0 nodes instead of minReplica 网上很多都是叫用两种方式解决 stop/start  重启hbase 格式化 hdfs namenode -format,不能随随便便就格

1.java.io包中定义了多个流类型来实现输入和输出功能,

1.java.io包中定义了多个流类型来实现输入和输出功能,可以从不同的角度对其进行分 类,按功能分为:(C),如果为读取的内容进行处理后再输出,需要使用下列哪种流?(G)   A.输入流和输出流 B.字节流和字符流 C.节点流和处理流   D.File stream E.Pipe stream F.Random stream G.Filter stream