Vert.x 线程模型揭秘

来源:鸟窝,

colobu.com/2016/03/31/vertx-thread-model/

如有好文章投稿,请点击 → 这里了解详情

Vert.x是一个在JVM开发reactive应用的框架,可用于开发异步、可伸缩、高并发的Web应用(虽然不限于web应用)。其目的在于为JVM提供一个Node.js的替代方案。开发者可以通过它使用JavaScript、Ruby、Groovy、Java,甚至是混合语言来编写应用。

使用Vertx.x框架,可以用JavaScript、CoffeeScript、Ruby、Python、Groovy或Java开发应用程序的组件,最终应用程序可以是混合语言构建的。

本文试图揭示Vert.x的线程模型的应用,通过源代码分析Vert.x如何使用线程池处理请求的,以及比较Netty和Vert.x使用线程池的异同。

也许你觉得奇怪,默认启动一个Vert.x Verticle实例,它只用一个线程处理事件,在多核的情况下你需要创建多个Verticle实例以充分利用多个CPU Core的性能。

Vert.x 实例

首先先啰嗦地介绍一些Vert.x概念,熟悉Vert.x开发的朋友可以跳过这一节

在Vert.x里,如果你不使用Vertx对象,你几乎是寸步难行。

Vertx对象扮演着Vert.x控制中心的角色,同时它也提供了大量的功能,例如:

  • 编写TCP客户端和服务器
  • 编写HTTP客户端和服务器,包括websocket
  • Event bus
  • 共享数据
  • 定时器
  • 发布和卸载Verticle
  • UDP
  • DNS client
  • 文件系统访问
  • 高可用
  • 集群

如果你将Vert.x嵌入到你的应用程序中,你可以向下面这样获得一个Vertx对象的引用

Vertx vertx = Vertx.vertx();

当你实例化Vertx对象时,如果你感觉默认的参数不符合你的需求,你可以指定实例化时的参数:

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

VertxOptions对象拥有很多关于Vertx实例设置,例如配置集群,高可用设置,线程池大小以及等等其他参数。下面就介绍一下它的线程池。

1 线程池

1、eventLoopGroup

这个对象是NioEventLoopGroup的一个实例,它的线程池的大小由options.getEventLoopPoolSize()决定,如果没有设置,默认为CPU核数 * 2。

eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false);

eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);

eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

它的EventLoop和一个Context对应:

protected ContextImpl(……) {

……

EventLoopGroup group = vertx.getEventLoopGroup();

if (group != null) {

this.eventLoop = group.next();

} else {

this.eventLoop = null;

}

……

}

它用来执行标准的Verticle。

2、WorkerPool

用来执行worker Verticle。

workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(),

new VertxThreadFactory("vert.x-worker-thread-", checker, true));

3、Internal Blocking Pool

内部使用的线程池,可以用来将阻塞代码异步化。

internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),

new VertxThreadFactory("vert.x-internal-blocking-", checker, true));

不要在event loop中执行阻塞操作, 比如访问数据库或者网络资源,这绝对会影响你的应用的性能。对于这些阻塞操作,你可以将它们异步化:

vertx.executeBlocking(future -> {

// 下面这行代码可能花费很长时间

String result = someAPI.blockingMethod("hello");

future.complete(result);

}, res -> {

System.out.println("The result is: " + res.result());

});

默认情况下executeBlocking会在同一个context中执行(同一个verticle实例),它们会串行化执行。如果不关心这个执行的顺序,可以将ordered参数设为false,它们会在worker pool线程池中并行的执行。

另外一种执行阻塞代码的方式就是使用worker verticle,worker verticle总是在worker pool线程池中执行。

2 Verticle

Verticle有点类似Actor模型,也可以实现并发的,可扩展的,易于发布的模型。

一个vert.x应用可以包含多个verticle实例,实例之间可以通过event bus通讯。

2.1 三种类型

http://vertx.io/docs/vertx-core/java/#_verticle_types

1、Standard Verticle: 最通用的类型,总是在event loop中执行。

2、Worker Verticle:它们使用worker pool线程池运行。一个verticle实例绝对不会在两个或者更多线程中并发执行。

3、Multi-threaded worker verticle:它们使用worker pool线程池运行。 一个verticle实例可以在多个线程中并发执行。

实现一个Verticle很简单:

public class MyVerticle extends AbstractVerticle {

// 当发布verticle时调用

public void start() {

}

// 可以不实现。当 verticle 卸载时调用

public void stop() {

}

}

2.2 发布方式

1、命令行方式

vertx run SomeJavaSourceFile.java

或者通过maven-shade-plugin打包成一个fat包:

<transformers>

<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

<manifestEntries>

<Main-Class>io.vertx.core.Starter</Main-Class>

<Main-Verticle>com.colobu.vertx.Main</Main-Verticle>

</manifestEntries>

</transformer>

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

<resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource>

</transformer>

</transformers>

然后运行 java -jar xxx-fat.jar,你还可以传递一些参数。

2、编程方式

你也可以编程的方式,通过vertx.deployVerticle发布:

public class Main  extends AbstractVerticle {

public static void main(String[] args) {

VertxOptions vo = new VertxOptions();

vo.setEventLoopPoolSize(16);

Vertx vertx = Vertx.vertx(vo);

DeploymentOptions options = new DeploymentOptions();

options.setInstances(100);

vertx.deployVerticle(Main.class.getName(), options, e -> {

System.out.println(e.succeeded());

System.out.println(e.failed());

System.out.println(e.cause());

System.out.println(e.result());

});

}

@Override

public void start() {

Handler<HttpServerRequest> handler = e -> {

HttpServerResponse response = e.response();

response.putHeader("content-type", "application/json").end("Hello world");

};

vertx.createHttpServer().requestHandler(handler).listen(8080);

}

}

Verticle发布和Vert.x线程模型

以上比较啰嗦,主要介绍了一些Vert.x的一些概念。下面是我想重点介绍的内容。

本节以实现一个简单的http server为例(编程方式发布Verticle),分析 vert.x 的线程和Verticle的关系。只分析标准的Verticle。代码如上。

1 Verticle发布过程

首先先创建一个Vertx实例,可以你可以通过VertxOptions设置线程池的大小。上面的例子中设置Event Loop线程池的大小为16:

vo.setEventLoopPoolSize(16);

因此即使你创建几百个Verticle,也只会有16个Event Loop处理它们,你可以通过jstack查看这些线程。你会看到多个名为vert.x-eventloop-thread-<num>的线程,一个vertx-blocked-thread-checker线程,一个vert.x-acceptor-thread-0。

调用void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler)方法发布Verticle。

DeploymentOptions对象可以设置发布参数,比如是否是worker verticle,多线程worker verticle, ha, 隔离组等, 重要的是instances,它用来指定分布的Verticle实例的数量,默认是一个。

底层调用DeploymentManager的doDeployVerticle来实现,它会根据实例数创建相应多的Verticle,然后调用doDeploy发布这些Verticle:

Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);

我将doDeploy方法简化,让我们看一下关键代码:

private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,

ContextImpl parentContext,

ContextImpl callingContext,

Handler<AsyncResult<String>> completionHandler,

ClassLoader tccl, Verticle... verticles) {

//准备工作

……

for (Verticle verticle: verticles) {

//创建上下文

ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, conf, tccl) :

vertx.createEventLoopContext(deploymentID, conf, tccl);

deployment.addVerticle(new VerticleHolder(verticle, context));

context.runOnContext(v -> {

try {

verticle.init(vertx, context);

Future<Void> startFuture = Future.future();

verticle.start(startFuture);

startFuture.setHandler(……);

} catch (Throwable t) {}

});

}

}

可以看到#11 行创建了一个上下文ContextImpl, 因为本例中我们不用worker模式,所以这个上下文是通过vertx.createEventLoopContext(deploymentID, conf, tccl)创建的。每个verticle都会创建一个新的上下文,因此verticle和上下文是意义对应的。

#17 行初始化verticle,#19 行启动这个verticle。还记得我们的例子中实现的start方法吗,它会在这里被调用。

这样,多个verticle实例被发布了。

2 线程模型

首先插播一下Netty的线程模型,不感兴趣的可以略过。

2.1 Netty的线程模型

虽然Vert.x底层籍由Netty实现,但是它的处理方式与Netty NIO的线程模型是不同的。

(以下谈论的Netty线程模型是指NIO的情况)

比如下面的Netty代码片段:

EventLoopGroup parentGroup = new NioEventLoopGroup(1);

EventLoopGroup childGroup = new NioEventLoopGroup(50);

try {

ServerBootstrap b = new ServerBootstrap();

b.group(parentGroup, childGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer<SocketChannel>(){……});

Channel ch = b.bind("0.0.0.0",8080).sync().channel();

ch.closeFuture().sync();

} finally {

parentGroup.shutdownGracefully();

childGroup.shutdownGracefully();

}

NioEventLoopGroup代表一组EventLoop,每个EventLoop映射一个线程,每个Channel注册一个EventLoop,但是一个EventLoop可以关联多个Channel。

parentGroup用来处理Accept事件,而childGroup用来处理其余的IO事件。当有并发连接的时候,Handler会在childGroup线程池中执行。你可以指定childGroup的线程数量,如果没有指定,则从系统属性中读取”io.netty.eventLoopThreads”,如果这个属性没有设置,则使用CPU核数 2 (Runtime.getRuntime().availableProcessors() 2))。一般parentGroup设置为1,我们只需要一个Acceptor处理客户端的连接即可。

当有多个并发连接的时候,每个连接/Channel被分配到一个EventLoop上。EventLoop选择是均匀地 (如果线程数是2的n次方,可以用比较快的选择方法PowerOfTwoEventExecutorChooser):

private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {

@Override

public EventExecutor next() {

return children[childIndex.getAndIncrement() & children.length - 1];

}

}

private final class GenericEventExecutorChooser implements EventExecutorChooser {

@Override

public EventExecutor next() {

return children[Math.abs(childIndex.getAndIncrement() % children.length)];

}

}

因此一旦如果某个EventLoop处理慢了,则这个线程上的event可能出现堆积。

比如下面的代码故意在某个线程上处理慢一些,导致这个EventLoop上出现堆积,Netty并没有根据压力将时间分配到其它处理快的EventLoop上。

public class HelloServerHandler extends ChannelInboundHandlerAdapter {

……

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) {

String name = Thread.currentThread().getName();

System.out.println(name);

if (name.endsWith("-5")) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

……

输出结果可以看到nioEventLoopGroup-3-5处理了同样多的请求,而且都堆积在后面了。

……

nioEventLoopGroup-3-19

nioEventLoopGroup-3-18

nioEventLoopGroup-3-19

nioEventLoopGroup-3-18

nioEventLoopGroup-3-20

nioEventLoopGroup-3-20

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

nioEventLoopGroup-3-5

因此,我们可以了解到,当启动一个NIO方式的Netty实例的时候,它会使用一个线程池来处理http请求。

Netty 4.0的线程模型被很好的重定义,一个ChannelHandler实例的方法不会被并发的调用,除非它被@Sharable标记,因此你不应该增加一个ChannelHandler 实例多次。当你增加一个handler到ChannelPipeline中时,你可以指定一个特定的EventExecutorGroup来执行这个handler。如果没有指定,则使用Channel注册的EventLoop来执行。如果两个Handler被指定不同的EventExecutorGroup,则它们会并发执行,因此如果它们会访问共享数据的化,你需要关注并发控制的问题。更多内容可以查看 Netty的文档。

2.2 Vert.x的线程模型

Vert.x如何在线程中处理事件的呢,还是以我们的例子分析。

回顾一下我们实现的Verticle的start方法。

@Override

public void start() {

Handler<HttpServerRequest> handler = e -> {

HttpServerResponse response = e.response();

response.putHeader("content-type", "application/json").end("Hello world");

};

vertx.createHttpServer().requestHandler(handler).listen(8080);

}

在这个start方法中,我们创建了一个http server,让它监听 8080端口, http request的处理交给handler执行。 那么监听线程是哪一个?handler又是在哪个线程池中执行的呢?调用多个Verticle实例的方法为什么没有出现”地址/端口被占用”的异常呢?

首先vertx.createHttpServer()会创建一个HttpServerImpl对象,可以通过HttpServerOptions配置更多的参数,每个Verticle实例都会创建一个HttpServerImpl对象。requestHandler(handler)方法设置处理器,你还可以使用Vert.x-Web设置路由的功能。

listen(8080)启动http 服务器,它实际调用netty实现的。

我将listen方法简化,去除一些检查代码和回调处理,只保留关键代码如下:

public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) {

listenContext = vertx.getOrCreateContext();

listening = true;

synchronized (vertx.sharedHttpServers()) {

id = new ServerID(port, host);

HttpServerImpl shared = vertx.sharedHttpServers().get(id);

if (shared == null) {

serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);

bootstrap.channelFactory(new VertxNioServerChannelFactory());

bootstrap.childHandler(new ChannelInitializer<Channel>() {

@Override

protected void initChannel(Channel ch) throws Exception {

……

pipeline.addLast("handler", new ServerHandler());

}

});

addHandlers(this, listenContext);

vertx.sharedHttpServers().put(id, this);

actualServer = this;

} else {

// Server already exists with that host/port - we will use that

actualServer = shared;

addHandlers(actualServer, listenContext);

metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options);

}

}

return this;

}

#6 行可以看到它会检查使用这个IP地址和端口的http server是否存在,如果存在的化直接跳到# 27行。因此回答上面的问题,多个Verticle实例不会引起冲突,因为它们会共享同一个http server。

这个http server通过netty ServerBootstrap创建。#10 行可以看到acceptor是一个单线程执行的,acceptorEventLoopGroup在VertxImpl中定义。

acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);

#10 行还显示,netty的IO worker线程池由availableWorkers确定,它是一个VertxEventLoopGroup对象。VertxEventLoopGroup类扩展AbstractEventExecutorGroup,实现了EventLoopGroup接口:

……

@Override

public synchronized EventLoop next() {

if (workers.isEmpty()) {

throw new IllegalStateException();

} else {

EventLoop worker = workers.get(pos).worker;

pos++;

checkPos();

return worker;

}

}

public synchronized void addWorker(EventLoop worker) {

EventLoopHolder holder = findHolder(worker);

if (holder == null) {

workers.add(new EventLoopHolder(worker));

} else {

holder.count++;

}

}

……

线程的数量由worker的数量决定,worker的类型是EventLoop,对应一个线程,有多少worker就会有多少线程。

通过addWorker可以增加线程的数量,worker不会重复。

回到刚才的listen方法, #21 行addHandlers方法会配置handler在哪一个event loop中执行:

private void addHandlers(HttpServerImpl server, ContextImpl context) {

if (requestStream.handler() != null) {

server.reqHandlerManager.addHandler(requestStream.handler(), context);

}

if (wsStream.handler() != null) {

server.wsHandlerManager.addHandler(wsStream.handler(), context);

}

}

server.reqHandlerManager.addHandler方法如下:

public synchronized void addHandler(Handler<T> handler, ContextImpl context) {

EventLoop worker = context.nettyEventLoop();

availableWorkers.addWorker(worker);

Handlers<T> handlers = new Handlers<>();

Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);

if (prev != null) {

handlers = prev;

}

handlers.addHandler(new HandlerHolder<>(context, handler));

hasHandlers = true;

}

#2 行得到这个上下文的EventLoop。 还记得上下文的EventLoop怎么创建出来的吗?每个Verticle实例关联一个上下文,因此一个Verticle实例只会创建一个worker。

把这个worker加入到availableWorkers,这样就增加了一个事件处理线程。

因此我们可以看出正常情况下Vert.x的每个Verticle实例只会用一个线程处理请求,在多核情况下一定要配置instance的数量。

如果配置的instance的数量大于eventLoopPoolSize数量,那么就会有一个Event Loop处理多个instance的情况。 线程配置的过多有时不会带来性能的提升,由于线程也有context swicthing,反而会带来性能的降低。

时间: 2024-10-02 15:31:11

Vert.x 线程模型揭秘的相关文章

多线程——线程模型

什么是程序? 安装在磁盘上的一段指令集合,它是静态的概念. 什么是进程? 它是运行中的程序,是动态的概念,每个进程都有独立的资源空间. 什么是线程? 线程,又称为轻量级进程,是程序执行流的最小单元,是程序中一个单一的顺序控制流程.线程是进程的一个实体,是被系统独立调度和分派的基本单位. 什么是多线程? 多线程则指的是在单个程序中可以同时运行多个不同的线程执行不同的任务. 多线程的特点 ①   一个进程可以包含一个或多个线程. ②   一个程序实现多个代码同时交替运行就需要产生多个线程. ③  

JS线程模型&amp;Web Worker

js线程模型 客户端javascript是单线程,浏览器无法同时运行两个事件处理程序 设计为单线程的理论是,客户端的javascript函数必须不能运行太长时间,否则会导致web浏览器无法对用户输入做出响应.这也是为什么Ajax的API都是异步的,以及为什么客户端Javascript不能使用一个简单的异步load()或者require()函数来加载javascript库 如果应用程序不得不执行太多的计算而导致明显的延迟,应该允许文档在执行这个计算之前完全载入,并且确保告诉用户正在进行计算并且浏览

线程模型的综述

本文首先介绍了一些线程基础,比如并发.并行.内存分配.系统调用.POSIX线程.接着通过strace分析了线程与进程的区别.最后以Android.Golang等线程模型进行了分析. 基础 1. 什么是并发(Concurrent),什么是并行(Parallels)? 并发指同时进行多个计算任务. 并行指通过切换时间片模拟进行多个计算任务. 详细可以参考Difference between concurrent programming and parallel programming - stack

Netty线程模型

一.Reactor模型 1.单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接: 2)作为NIO客户端,向服务端发起TCP连接: 3)读取通信对端的请求或者应答消息: 4)向通信对端发送消息请求或者应答消息 Reactor单线程模型示意图如下所示: 由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作.从架构层面看,一个NI

Netty系列之Netty线程模型

1. 背景 1.1. Java线程模型的演进 1.1.1. 单线程 时间回到十几年前,那时主流的CPU都还是单核(除了商用高性能的小机),CPU的核心频率是机器最重要的指标之一. 在Java领域当时比较流行的是单线程编程,对于CPU密集型的应用程序而言,频繁的通过多线程进行协作和抢占时间片反而会降低性能. 1.1.2. 多线程 随着硬件性能的提升,CPU的核数越来越越多,很多服务器标配已经达到32或64核.通过多线程并发编程,可以充分利用多核CPU的处理能力,提升系统的处理效率和并发性能. 相关

MyCat线程模型分析

参考MyCat权威指南,对MyCat-Server里面的线程模型做简要分析: 1. 线程模型图 根据MyCat权威指南,做出以下线程模型图: MyCat的线程模型主要分为三部分(: 网络通讯线程.业务线程和定时任务线程,下面分别介绍这些线程的使用: [温馨提示] 这里排除JVM自身使用的线程,只关注MyCat服务所使用的线程,如果需要详细了解MyCat里面使用的所有线程,请参考<MyCat权威指南>-> 开发篇 -> MyCat线程模型分析 2. 网络通讯线程 MyCat-Serv

看我是如何处理自定义线程模型---java

看过我之前文章的园友可能知道我是做游戏开发,我的很多思路和出发点是按照游戏思路来处理的,所以和web的话可能会有冲突,不相符合. 来说说为啥我要自定义线程模型呢? 按照我做的mmorpg或者mmoarpg游戏划分,线程被划分为,主线程,全局同步线程,聊天线程,组队线程,地图线程,以及地图消息分发派送线程等: 一些列,都需要根据我的划分,以及数据流向做控制. 游戏服务器,主要要做的事情,肯定是接受玩家的 命令请求 -> 相应的操作 -> 返回结果: 在服务器端所有的消息都会注册到消息管理器里,然

【转】netty线程模型

Netty服务器线程模型概览 博客分类: netty java 一切从ServerBootstrap开始 ServerBootstrap 负责初始话netty服务器,并且开始监听端口的socket请求. Java代码   bootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(),//boss线程池 Executors.newCached

Mina的线程模型

在Mina的NIO模式中有三种I/O工作线程(这三种线程模型只在NIOSocket中有效,在NIO数据包和虚拟管道中没有,也不需要配置): IoAcceptor/IoConnector线程 IoProcessor线程 IoHandler线程 一.Acceptor  thread 该线程的作用是接收客户端的连接,并将客户端的连接导入到I/O processor线程模型中.所谓的I/O processor线程模型就是Mina的I/O processor thread.Acceptor thread在