netty(五) channel

问题

  • channel 是如何处理发送一半中断后继续重发的
  • channel 具体作用是什么

概述

这一节我们将介绍 Channel 和内部接口 Unsafe .其中Unsafe 是内部接口,聚合在Channel 中协助网络读写操作相关的操作,设计初衷就是 Channel 的内部辅助类,不应该被用户使用。

继承类分析

继承关系链 :

AbstractChannel -> AbstractNioChannel -> AbstractNioByteChannel -> NioSocketChannel 如下图

从以上的类结构我们也要学习一下类的构建,各个类实现应该实现的功能,最后生成的具体类具有不同的功能。
AbstractChannel ,保存以下重要的字段 ,主要
- EventLoop
- localAddress
- remoteAddress
- unsafe
- DefaultChannelPipleline
- Future类 和 Promise类 等

AbstractNioChannel,从类名可以看出和nio 中 Channel 相关,注册,监听

    private final SelectableChannel ch;
    protected final int readInterestOp;
    private volatile SelectionKey selectionKey;
    private volatile boolean inputShutdown;

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;
 AbstractNioByteChannel 这个类是Channel对Byte进行操作,对ByteBuff的读写。

源码分析

AbstractChannel

AbstractChannel 的读写方法都是交由 ChannelPiple 来解决的
    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }

    @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

eventLoop方法,直接返回持有的 eventloop对象

    @Override
    public EventLoop eventLoop() {
        return eventLoop;
    }

AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);

    // No.1 注册监听相关的字段
    private final SelectableChannel ch;
    protected final int readInterestOp;
    private volatile SelectionKey selectionKey;
    private volatile boolean inputShutdown;

    // No.2 异步执行的字段,或是回调相关的字段
    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

    ...

    //核心方法
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
            	//拿父类的channel对象(父类的channel对象是java原生channel 对象)
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

    //开始read的操作
    @Override
    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        //就是改变监听的事件
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

AbstractNioByteChannel

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;

        //循环
        for (;;) {
            Object msg = in.current(true);

            if (msg == null) {
                // Wrote all messages.写完了(发送完了)所有的消息,清除标志,结束
                clearOpWrite();
                break;
            }

            if (msg instanceof ByteBuf) {
                //加入是ByteBuf类型
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                //判断当前的可读字节是否为 0 ,为 0 丢弃掉
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                //循环发送次数
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    //doWriteBytes 子类实现
                    int localFlushedAmount = doWriteBytes(buf);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    //一直到不可读
                    if (!buf.isReadable()) {
                        done = true;
                        break;
                    }
                }
                //发送完,更新发送的进度(有可能没发完)
                in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    //没发完,设置写半包标识,启动刷新线程继续发送之前没有发送完成的半包消息
                    incompleteWrite(setOpWrite);
                    break;
                }
            } else if (msg instanceof FileRegion) {
                FileRegion region = (FileRegion) msg;
                boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }

                //循环发送
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (region.transfered() >= region.count()) {
                        done = true;
                        break;
                    }
                }
                //发送完(有可能发送了一半)更新进度
                in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    //没法完,创建一个任务扔到EventLoop
                    incompleteWrite(setOpWrite);
                    break;
                }
            } else {
                throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
            }
        }
    }

    //没写完(没发送完)
    protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {
            setOpWrite();
        } else {
            // Schedule flush again later so other tasks can be picked up in the meantime
            //创建任务扔到 eventLoop执行
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();
                    }
                };
            }
            eventLoop().execute(flushTask);
        }
    }
循环发送次数是指一次发送没有完成时(写半包),程序就继续尝试循环写操作,此时IO线程是不能处理其他事件的,例如读新的消息或者执行定时任务和 NioTask 等, 如果网络IO阻塞或者对方接收消息太慢,可能会导致线程假死,于是就要循环发送。

AbstractNioMessageChannel

我们再来看一下AbstractNioChannel 的另外一个子类 AbstractNioMessageChannel,直接看doWrite方法
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }

            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                if (doWriteMessage(msg, in)) {
                    done = true;
                    break;
                }
            }

            if (done) {
                in.remove();
            } else {
                // Did not write all messages.
                //没发送完,设置标志,交给 select 多路复用器轮询对应的channel重新发送尚未发送完成的半包信息
                if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                    key.interestOps(interestOps | SelectionKey.OP_WRITE);
                }
                break;
            }
        }
    }
AbstractNioMessageChannel 和 AbstractNioByteChannel的区别在于

NioServerSocketChannel 和 NioServerChannel 的分析

NioSocketChannel 和 NioServerSocketChannel 的区别到底是什么?后者是服务端当中负责绑定端口,读取数据功能,连接和断开,写消息都不支持,这些功能都在NioSocketChannel中实现

AbstractNioMessageServerChannel 的具体子类是 NioServerSocketChannel(该类是服务器端接受处理客户端的channel),它的doReadMessages方法(被对应的unsafe类read方法,这里可能有点饶,具体看代码实现)分析如下
	@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
            	//构建一个NioSocketChannel放进数组中
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

NioServerChannel的源码分析

public class NioServerSocketChannel extends AbstractNioMessageServerChannel
                                 implements io.netty.channel.socket.ServerSocketChannel {

    private static final ChannelMetadata METADATA = new ChannelMetadata(false);

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

    private static ServerSocketChannel newSocket() {
        try {
            return ServerSocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

    private final ServerSocketChannelConfig config;

    /**
     * Create a new instance
     */
    public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
        super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
    }

    @Override
    public InetSocketAddress localAddress() {
        return (InetSocketAddress) super.localAddress();
    }

    @Override
    public ChannelMetadata metadata() {
        return METADATA;
    }

    @Override
    public ServerSocketChannelConfig config() {
        return config;
    }

    @Override
    public boolean isActive() {
        return javaChannel().socket().isBound();
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return null;
    }

    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }

    @Override
    protected SocketAddress localAddress0() {
        return javaChannel().socket().getLocalSocketAddress();
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }

    @Override
    protected void doClose() throws Exception {
        javaChannel().close();
    }

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

    // Unnecessary stuff
    @Override
    protected boolean doConnect(
            SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void doFinishConnect() throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected SocketAddress remoteAddress0() {
        return null;
    }

    @Override
    protected void doDisconnect() throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
        throw new UnsupportedOperationException();
    }
}
可以看到 NioServerChannel 的主要都是 override 父类的方法,即是说大部分的逻辑都在父类 Abstract中进行了一层层的封装,给我们一个启发,好的类结构在
在一开始就已经设计好,最终的具体实现交由尾端实现。

总结

本文介绍了channel的主要功能作用。

参考资料

  • 《Netty权威指南》

原文地址:https://www.cnblogs.com/Benjious/p/11634871.html

时间: 2024-10-08 19:35:19

netty(五) channel的相关文章

学习 java netty (三) -- Channel

学习 java netty (三) – Channel 前言:netty封装的channel,看一下官网的定义 A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind. 可以I/O操作(如读,写,连接和绑定)的连网套接字或组件 A channel provides a user: All I/O operations a

netty解决channel管理,可广播消息

在Netty中提供了ChannelGroup接口,该接口继承Set接口,因此可以通过ChannelGroup可管理服务器端所有的连接的Channel,然后对所有的连接Channel广播消息. Server端: public class BroadCastServer { public static void run(int port) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioE

关于Netty对于Channel超时机制缺陷的一点想法

我们看看ReadTimeoutHandler下面这个初始化方法,在初始化的时候做的一些事情,Netty的2个改进点我认为都在这里可以体现出来(下面红体): private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/1

Netty的Channel

Channel是一个网络端口连接,或者是可以进行读,写,链接,绑定端口的组件的连接.  Channel就是一个链接,它提供了如下的功能. 1:获取当前链接的状态 2:配置当前链接参数 3:进行read,write,connect,bind等通道支持的操作. 4:该Channel关联的ChannelPipeLine处理所有的IO事件和绑定在这个channel的请求 Netty中所有的IO请求都是异步的,都立即返回一个ChannelFuture对象,而不包装调用结束完成,可以通过ChannelFut

Netty:Channel

我们通过一个简单的Netty代码了解到了Netty中的核心组件,这一篇我们将围绕核心组件中的Channel来展开学习. Channel的简介 Channel代表着与网络套接字或者能够进行IO操作(read.write.connect或者bind)的组件的联系,一个Channel向用户提供了如下内容: 1.Channel当前的状态,比如是否打开.是否连接: 2.Channel的配置参数,比如接收缓冲区的大小: 3.Channel支持的IO操作(read.write.connect或者bind):

3. Netty源码阅读之Channel

一.Channel介绍 Channel中的NioServerSocketChannel 和 NioSocketChannel 分别于 NIO中的 ServerSocketChannel.SocketChannel对应.不同的是,Netty的Channel将NIO中的Channel聚合在自己对象内部,并提供其他的功能操作. 二.Channel源码介绍 1. 常用方法介绍 eventLoop() Channel需要注册到EventLoop上的多路复用器上,通过该方法可获取到Channel注册的Eve

atitit.软件开发--socket框架选型--netty vs mina j

atitit.软件开发--socket框架选型--netty vs mina j . Netty是由JBOSS提供的一个java开源框架 Apache mina 三.文档比较 mina文档多,,, 好几倍... 作者:: 老哇的爪子 Attilax 艾龙,  EMAIL:[email protected] 转载请注明来源: http://blog.csdn.net/attilax 四.UDP协议传输 1. netty将UDP无连接的特性暴露出来:而mina对UDP进行了高级层次的抽象,可以把UD

netty实现客户端服务端心跳重连

使用netty实现客户端服务端心跳重连 前言: 公司的加密机调度系统一直使用的是http请求调度的方式去调度,但是会出现网络故障导致某个客户端或者服务端断线的情况,导致很多请求信息以及回执信息丢失的情况,接着我们抛弃了http的方式,改为Tcp的方式去建立客户端和服务器之间的连接,并且要去实现断线重连的功能,经过讨论后决定使用java中成熟的nio框架 – netty去解决这一系列的问题. 1.       netty简单介绍: 在百度中对netty的解释是: Netty是由JBOSS提供的一个

Netty中的那些坑

Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部分引起的. 注:这里说的坑不是说netty不好,只是如果这些地方不注意,或者不去看netty的代码,就有可能掉进去了. 坑1: Netty 4的线程模型转变 在Netty 3的时候,upstream是在IO线程里执行的,而downstream是在业务线程里执行的.比如netty从网络读取一个包传递给

Netty 3.7.0 Final 第一篇

netty简介: 做游戏的都知道,netty和mina是网络层通讯流行的两种socket框架.mina出现的早些,属于apache,netty目前更新比较稳定,5暂时还处于alpha版本,本系列netty前四篇主要是介绍目前比较用的多的3系列,后面会简要的说到netty4和mina. netty是网络通讯框架,为什么要使用netty?使用netty有什么好处?基于Java nio的编程的难度其实可以参考Reactor模型,多路复用这些,监听端口,建立线程,关闭连接等等,这些想着都觉得很复杂,而我