深入了解Netty【六】Netty工作原理


引言

前面学习了NIO与零拷贝、IO多路复用模型、Reactor主从模型。
服务器基于IO模型管理连接,获取输入数据,又基于线程模型,处理请求。
下面来学习Netty的具体应用。

1、Netty线程模型

Netty线程模型是建立在Reactor主从模式的基础上,主从 Rreactor 多线程模型:

但是在Netty中,bossGroup相当于mainReactor,workerGroup相当于SubReactor与Worker线程池的合体。如:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class);
  • bossGroup
    bossGroup线程池负责监听端口,获取一个线程作为MainReactor,用于处理端口的Accept事件。
  • workerGroup
    workerGroup线程池负责处理Channel(通道)的I/O事件,并处理相应的业务。

在启动时,可以初始化多个线程。

EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(3);

2、Netty示例(客户端、服务器)

下面的例子演示了Netty的简单使用。

2.1、服务端

2.1.1、 EchoServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;

/**
 * EchoServerHandler
 */
// 标识这类的实例之间可以在 channel 里面共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: "   in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
2.1.2、 EchoServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * Echo服务端
 */
public class EchoServer {
    private final int port;
    private EchoServer(int port) {
        this.port = port;
    }
    private void start() throws Exception {
        //创建 EventLoopGroup
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        try {
            //创建 ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, work)
                    //指定使用 NIO 的传输 Channel
                    .channel(NioServerSocketChannel.class)
                    //设置 socket 地址使用所选的端口
                    .localAddress(new InetSocketAddress(port))
                    //添加 EchoServerHandler 到 Channel 的 ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            //绑定的服务器;sync 等待服务器关闭
            ChannelFuture f = b.bind().sync();
            System.out.println(EchoServer.class.getName()   " started and listen on "   f.channel().localAddress());
            //关闭 channel 和 块,直到它被关闭
            f.channel().closeFuture().sync();
        } finally {
            //关机的 EventLoopGroup,释放所有资源。
            group.shutdownGracefully().sync();
        }
    }
    public static void main(String[] args) throws Exception {
        //设置端口值(抛出一个 NumberFormatException 如果该端口参数的格式不正确)
        int port = 9999;
        //服务器start()
        new EchoServer(port).start();
    }

}

2.2、客户端

2.2.1、EchoClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) {
        System.out.println("Client received: "   msg.toString(CharsetUtil.UTF_8));
    }
}
2.2.2、EchoClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {
    private final String host;
    private final int port;
    private EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    private void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建 Bootstrap
            Bootstrap b = new Bootstrap();
            //指定 EventLoopGroup 来处理客户端事件。
            //由于使用 NIO 传输,所以用到了 NioEventLoopGroup 的实现
            b.group(group)
                    //使用的 channel 类型是一个用于 NIO 传输
                    .channel(NioSocketChannel.class)
                    //设置服务器的 InetSocketAddress
                    .remoteAddress(new InetSocketAddress(host, port))
                    //当建立一个连接和一个新的通道时,创建添加到 EchoClientHandler 实例 到 channel pipeline
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //连接到远程;等待连接完成
            ChannelFuture f = b.connect().sync();
            //阻塞直到 Channel 关闭
            f.channel().closeFuture().sync();
        } finally {
            //调用 shutdownGracefully() 来关闭线程池和释放所有资源
            group.shutdownGracefully().sync();
        }
    }
    public static void main(String[] args) throws Exception {
        //服务器地址及端口
        String host = "localhost";
        int port = 9999;
        new EchoClient(host, port).start();
    }
}

3、Netty工作原理

服务端包含了1个boss NioEventLoopGroup和1个work NioEventLoopGroup。
NioEventLoopGroup相当于1个事件循环组,组内包含多个事件循环(NioEventLoop),每个NioEventLoop包含1个Selector和1个事件循环线程。

3.1、boss NioEventLoop循环任务

  • 轮询Accept事件。
  • 处理Accept IO事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个work NioEventLoop的Selector上。
  • 处理任务队列中的任务。

3.2、work NioEventLoop循环任务

  • 轮询Read、Write事件。
  • 处理IO事件,在NioSocketChannel可读、可写事件发生时进行处理。
  • 处理任务队列中的任务。

3.3、任务队列中的任务

  1. 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
   @Override
   public void run() {
       //...
   }
});
  1. 非当前 Reactor 线程调用 Channel 的各种方法
    例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费。
  2. 用户自定义定时任务
ctx.channel().eventLoop().schedule(new Runnable() {
   @Override
   public void run() {
       //...
   }
}, 60, TimeUnit.SECONDS);

参考

这可能是目前最透彻的Netty原理架构解析
Netty 实战精髓篇
Netty入门教程?
Essential Netty in Action

原文地址:https://www.cnblogs.com/clawhub/p/11967401.html

时间: 2024-07-31 04:52:26

深入了解Netty【六】Netty工作原理的相关文章

Android 基于Netty的消息推送方案之概念和工作原理(二)

上一篇文章中我讲述了关于消息推送的方案以及一个基于Netty实现的一个简单的Hello World.为了更好的理解Hello World中的代码,今天我来解说一下关于Netty中一些概念和工作原理的内容,假设你认为本篇文章有些枯燥.请先去阅读<Android 基于Netty的消息推送方案之Hello World(一)> ChannelEvent Netty是基于事件驱动的,就是我们上文提到的.发生什么事.就通知"有关部门". 所以.不难理解.我们自己的业务代码中,一定有跟这

原理剖析-Netty之服务端启动工作原理分析(下)

一.大致介绍 1.由于篇幅过长难以发布,所以本章节接着上一节来的,上一章节为[原理剖析(第 010 篇)Netty之服务端启动工作原理分析(上)]: 2.那么本章节就继续分析Netty的服务端启动,分析Netty的源码版本为:netty-netty-4.1.22.Final: 二.三.四章节请看上一章节 四.源码分析Netty服务端启动 上一章节,我们主要分析了一下线程管理组对象是如何被实例化的,并且还了解到了每个线程管理组都有一个子线程数组来处理任务: 那么接下来我们就直接从4.6开始分析了:

Netty优雅退出机制和原理

1.进程的优雅退出 1.1.Kill -9 PID带来的问题 在Linux上通常会通过kill -9 pid的方式强制将某个进程杀掉,这种方式简单高效,因此很多程序的停止脚本经常会选择使用kill -9 pid的方式. 无论是Linux的Kill -9 pid还是windows的taskkill /f /pid强制进程退出,都会带来一些副作用:对应用软件而言其效果等同于突然掉电,可能会导致如下一些问题: 缓存中的数据尚未持久化到磁盘中,导致数据丢失: 正在进行文件的write操作,没有更新完成,

Android系统Recovery工作原理之使用update.zip升级过程分析(六)---Recovery服务流程细节【转】

本文转载自:http://blog.csdn.net/mu0206mu/article/details/7465439  Android系统Recovery工作原理之使用update.zip升级过程分析(六)---Recovery服务流程细节            Recovery服务毫无疑问是Recovery启动模式中最核心的部分.它完成Recovery模式所有的工作.Recovery程序对应的源码文件位于:/gingerbread0919/bootable/recovery/recovery

How Javascript works (Javascript工作原理) (六) WebAssembly 对比 JavaScript 及其使用场景

个人总结: 1.webassembly简介:WebAssembly是一种用于开发网络应用的高效,底层的字节码.允许在网络应用中使用除JavaScript的语言以外的语言(比如C,C++,Rust及其他)来编写应用程序,然后编译成(提早)WebAssembly. 这是 JavaScript 工作原理的第六章. 现在,我们将会剖析 WebAssembly 的工作原理,而最重要的是它和 JavaScript 在性能方面的比对:加载时间,执行速度,垃圾回收,内存使用,平台 API 访问,调试,多线程以及

【原创】源码角度分析Android的消息机制系列(六)——Handler的工作原理

ι 版权声明:本文为博主原创文章,未经博主允许不得转载. 先看Handler的定义: /** * A Handler allows you to send and process {@link Message} and Runnable * objects associated with a thread's {@link MessageQueue}. Each Handler * instance is associated with a single thread and that thre

说一下Dubbo 的工作原理?注册中心挂了可以继续通信吗?

面试题 说一下的 dubbo 的工作原理?注册中心挂了可以继续通信吗?说说一次 rpc 请求的流程? 面试官心理分析 MQ.ES.Redis.Dubbo,上来先问你一些思考性的问题.原理,比如 kafka 高可用架构原理.es 分布式架构原理.redis 线程模型原理.Dubbo 工作原理:之后就是生产环境里可能会碰到的一些问题,因为每种技术引入之后生产环境都可能会碰到一些问题:再来点综合的,就是系统设计,比如让你设计一个 MQ.设计一个搜索引擎.设计一个缓存.设计一个 rpc 框架等等. 那既

Mina工作原理分析

Mina是Apache社区维护的一个开源的高性能IO框架,在业界内久经考验,广为使用.Mina与后来兴起的高性能IO新贵Netty一样,都是韩国人Trustin Lee的大作,二者的设计理念是极为相似的.在作为一个强大的开发工具的同时,这两个框架的优雅设计和不俗的表现,有很多地方是值得学习和借鉴的.本文将从Mina工作原理的角度出发,对其结构进行分析. 总体结构 Mina的底层依赖的主要是Java NIO库,上层提供的是基于事件的异步接口.其整体的结构如下: IoService 最底层的是IOS

Worker的内部工作原理

一.Worker.Executor.Task 三者的关系 storm集群中的一台机器可能运行着一个或者多个worker进程,其从属于一个或者多个topology.一个worker进程运行着多个executor线程:每一个worker从属于一个topology:executor是单线程,每一个executor运行着相同组件(spout或者bolt)的1个或者多个task:1个task执行(spout或bolt)中的逻辑处理:用一句话来概括就是,一个worker运行着一个或者多个executor,每