netty源码分析

  1、Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

  2、目前netty有3个版本netty3、netty4、netty5。3个版本的内容有所不同。neety3是核心的代码介绍。相对于netty4、和netty5的复杂性来说。netty3的源码是值得学习的。我这里解析了netty3的一些源码,仅供大家理解,也是为了方便大家理解做了很多简化。不代表作者的开发思路。

  3、我们先来看一张图(这张图是我在学习源码的时候扣的,哈哈)

  一、传统NIO流

  

  1)一个线程里面,存在一个selector,当然这个selector也承担起看大门和服务客人的工作。

  2)这里不管多少客户端进来,都是这个selector来处理。这样就就加大了这个服务员的工作量

  3)为了加入线程池,让多个selector同时工作,当时目的性都是一样的。

  4)虽然看大门的和服务客人的都是服务员,但是还是存在差别的。为了更好的处理多个线程的问题。所以这里netty就诞生了。

二、netty框架

  

  理解:

  1)netty3的框架也是基于nio流做出来的。所以这里会详细介绍netty3框架的思路

  2)将看门的服务员和服务客人的服务员分开。形成两块(也就是2个线程池,也就是后面的boss和worker)

  3)当一个客人来的时候,首先boss,进行接待。然后boss分配工作给worker,这个,在两个线程池的工作下,有条不乱。

  4)原理:就是将看大门的selector和服务客人的selector分开。然后通过boss线程池,下发任务给对应的worker

  4、netty3源码分析

  1)加入对应的jar包。我这里为了了解源码用的是netty3的包。

    <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty</artifactId>
            <version>3.10.6.Final</version>
        </dependency>

  2)目录结构

  

  说明:

  a、NettyBoss、NettyWork是针对于selector做区分。虽然他们很多共性,我这里为了好理解,并没有做抽象类(忽略开发思路)。

  b、ThreadHandle是用来初始化线程池和对应的接口。

  c、Start为启动类

  3)NettyBoss(看大门的服务员,第一种线程selector)

package com.troy.application.netty;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class NettyBoss {

    //线程池
    public final Executor executor;
    //boss选择器
    protected Selector selector;
    //原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其他线程的执行
    protected final AtomicBoolean wakenUp = new AtomicBoolean();
    //队列,线程安全队列。
    public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
    //线程处理,这里主要是拿到work的线程池
    protected ThreadHandle threadHandle;

    //初始化
    public NettyBoss(Executor executor,ThreadHandle threadHandle) {
        //赋值
        this.executor = executor;
        this.threadHandle = threadHandle;
        try {
            //每一个线程选择器
            this.selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //从线程中获取一个线程执行以下内容
        executor.execute(() -> {
            while (true) {
                try {
                    //这里的目前就是排除其他线程同事执行,false因为这里处于阻塞状态,不用开启
                    wakenUp.set(false);
                    //选择器阻塞
                    selector.select();
                    //运行队列中的任务
                    while (true) {
                        final Runnable task = taskQueue.poll();
                        if (task == null) {
                            break;
                        }
                        //如果任务存在开始运行
                        task.run();
                    }
                    //对进来的进行处理
                    this.process(selector);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey key = i.next();
            i.remove();
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            // 新客户端
            SocketChannel channel = server.accept();
            // 设置为非阻塞
            channel.configureBlocking(false);
            // 获取一个worker
            NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() % threadHandle.workeres.length)];
            // 注册新客户端接入任务
            Runnable runnable = () -> {
                try {
                    //将客户端注册到selector中
                    channel.register(nextworker.selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            };
            //添加到work的队列中
            nextworker.taskQueue.add(runnable);
            if (nextworker.selector != null) {
                //这里的目前就是开启执行过程
                if (nextworker.wakenUp.compareAndSet(false, true)) {
                    //放开本次阻塞,进行下一步执行
                    nextworker.selector.wakeup();
                }
            } else {
                //任务完成移除线程
                taskQueue.remove(runnable);
            }
            System.out.println("新客户端链接");
        }
    }
}

  解释:

  a、初始化的时候,赋值线程池,和线程处理类(线程处理类目的是获取worker的工作线程)

  b、executor为线程池的执行过程。

  c、selector.select()为形成阻塞,wakenUp为了线程安全考核。在接入客户端的时候用selector.wakeup()来放开本次阻塞(很重要)。

  d、然后在worker安全队列中执行对应工作。(taskQueue的目前在boss和worker中的作用都是为了考虑线程安全,这里采用线程安全队列的目的是为了不直接操作其他线程)

  e、wakenUp.compareAndSet(false, true),这里是考虑并发问题。在本线程运行的时候,其他线程处于等待状态。这里也是为了线程安全考虑。

  4)NettyWork(服务客人的服务员,第二种selector)

package com.troy.application.netty;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class NettyWork {
    //线程池
    public final Executor executor;
    //boss选择器
    protected Selector selector;
    //原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其他线程的执行
    protected final AtomicBoolean wakenUp = new AtomicBoolean();
    //队列,线程安全队列。
    public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();

    //初始化
    public NettyWork(Executor executor) {
        this.executor = executor;
        try {
            //每一个work也需要一个选择器用来管理通道
            this.selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //从线程池中获取一个线程开始执行
        executor.execute(() -> {
            while (true) {
                try {
                    //阻塞状态排除问题
                    wakenUp.set(false);
                    //阻塞
                    selector.select();
                    //处理work任务
                    while (true) {
                        final Runnable task = taskQueue.poll();
                        if (task == null) {
                            break;
                        }
                        //存在work任务开始执行
                        task.run();
                    }
                    //处理任务
                    this.process(selector);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey) ite.next();
            // 移除,防止重复处理
            ite.remove();
            // 得到事件发生的Socket通道
            SocketChannel channel = (SocketChannel) key.channel();
            // 数据总长度
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //读取数据
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (Exception e) {
                // ignore
            }
            //判断是否连接已断开
            if (ret <= 0 || failure) {
                key.cancel();
                System.out.println("客户端断开连接");
            }else{
                System.out.println("收到数据:" + new String(buffer.array()));
                //回写数据
                ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
                channel.write(outBuffer);// 将消息回送给客户端
            }
        }
    }
}

  解释:

  a、worker的执行方式基本上面和boss的方式是一样的,只不够是处理方式不一样

  b、这里需要注意的是,都是考虑线程队列执行。

  3)ThreadHandle(线程处理,这里主要是启动需要的东西)

package com.troy.application.netty;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadHandle {

    public final AtomicInteger bossIndex = new AtomicInteger();
    public static NettyBoss[] bosses;
    public final AtomicInteger workerIndex = new AtomicInteger();
    public static NettyWork[] workeres;

    public ThreadHandle(ExecutorService boss,ExecutorService work) {
        this.bosses = new NettyBoss[1];
        //初始化boss线程池
        for (int i = 0; i < bosses.length; i++) {
            bosses[i] = new NettyBoss(boss,this);
        }
        this.workeres = new NettyWork[Runtime.getRuntime().availableProcessors() * 2];
        //初始化work线程池
        for (int i = 0; i < workeres.length; i++) {
            workeres[i] = new NettyWork(work);
        }
    }

    public void bind(InetSocketAddress inetSocketAddress) {
        try {
            // 获得一个ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 设置通道为非阻塞
            serverChannel.configureBlocking(false);
            // 将该通道对应的ServerSocket绑定到port端口
            serverChannel.socket().bind(inetSocketAddress);
            //获取一个boss线程
            NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)];
            //向boss注册一个ServerSocket通道
            Runnable runnable = () -> {
                try {
                    //注册serverChannel到selector
                    serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            };
            //加入任务队列
            nextBoss.taskQueue.add(runnable);
            if (nextBoss.selector != null) {
                //排除其他任务处理
                if (nextBoss.wakenUp.compareAndSet(false, true)) {
                    //放开阻塞
                    nextBoss.selector.wakeup();
                }
            } else {
                //移除任务
                nextBoss.taskQueue.remove(runnable);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  解释:

  a、这里采用数组的形式,主要目的是考虑多个看门的,和多个服务客人的线程。为了好控制,好选择,哪一个来执行。

  b、端口的注册,在NettyBoss里面进行初始化的的原理都是一样的。

  4)start

package com.troy.application.netty;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Start {

    public static void main(String[] args) {
        //声明线程池
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService work = Executors.newCachedThreadPool();
        //初始化线程池
        ThreadHandle threadHandle = new ThreadHandle(boss,work);
        //声明端口
        threadHandle.bind(new InetSocketAddress(9000));
        System.out.println("start");
    }
}

  说明一下流程

  a、初始化boss和work。让boss线程池加入设定第一种boss的selector,并且处于阻塞状态。work的初始化也基本上是一样的,只不过换成了第二种selector线程池,处于阻塞状态。

  b、当线程处理类初始化监听端口的时候。就是选择boss中其中一个selector。声明一个线程先监听,加入boss的线程安全队列中。然后放开boss阻塞,向下执行。线程执行会监听对应端口并阻塞。

  c、当一个客户端接入的时候,boss中的selector会监听到对应端口。然后选择work线程中的一个selector给work分派任务。

  d、最后work中的selector来处理事务。

  4、源码下载:https://pan.baidu.com/s/1pKIxuMf

  5、本代码只是用于理解netty的实现过程,不代表开发思路。其中我为了简化代码,做了很多调整。目的就是压缩代码,方便理解。

时间: 2024-08-03 10:18:07

netty源码分析的相关文章

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

Netty源码分析第2章(NioEventLoop)----&gt;第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop 第七节:处理IO事件 上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑: 回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case Sele

Netty源码分析第2章(NioEventLoop)----&gt;第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析第5章(ByteBuf)----&gt;第1节: AbstractByteBuf

Netty源码分析第五章: ByteBuf 概述: 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中 但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且by