Channel是客户端和服务端得以传输信息的通道,它维护这套接字或者可进行IO操作的组件。
下面我们以客户端及NioSocketChannel为例,看看他是怎么和socket发生联系的。
当客户端初始化时,首先要进行这样的操作
Bootstrap b = new Bootstrap(); ...... b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);通过channel方法去创建通道,流程如下:
首先执行AbstractBootstrap中的channel方法
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }它将Channel的具体类型传递给了ReflectiveChannelFactory类的Class<? extends T> clazz属性,ReflectiveChannelFactory类中实现了ChannelFactory接口的newChannel方法,用来创建Channel实例,后面会用到。
当客户端执行connect动作时,执行Bootstrap的connect代码
public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) { remoteAddress = new InetSocketAddress(inetHost, inetPort); return this; }继而执行doResolveAndConnect方法
private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); if (regFuture.cause() != null) { return regFuture; } final Channel channel = regFuture.channel(); final EventLoop eventLoop = channel.eventLoop(); ...... }这里执行了父类AbstractBootstrap中的initAndRegister方法
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); ...... }上面的代码我看到了newChannel方法,他执行的就是ReflectiveChannelFactory的newChannel
@Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }这里创建了一个Channel的实例对象。
在initAndRegister方法中,还有一个重要的方法就是init方法,它里面创建了一个重要的元素,就是ChannelPipeline,关于ChannelPipeline,我们后续介绍。
经过上面一系列的动作,Channel就被创建好了。下面就看看他是怎样通过socket连接的。
我们在Netty:EventLoopGroup中提到,服务端和客户端通过bind和connect开启一个线程,去监听通道的数据操作。客户端通过Bootstrap类的doConnect0开始这个过程。
private static void doConnect0( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { connectPromise.setFailure(regFuture.cause()); } } }); }上面代码中,channel.eventLoop().execute启动线程,channel.connect开始连接。
这里的channel是NioSocketChannel,它的继承关系大致如下
NioSocketChannel extends AbstractNioByteChannel extends AbstractNioChannel extends AbstractNioChannel extends AbstractChannel extends DefaultAttributeMapchannel.connect执行AbstractNioChannel中的connect方法@Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ...... if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { ...... } }doConnect是abstract方法,由NioSocketChannel的doConnect实现
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { ...... boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress); ...... return connected; } finally { if (!success) { doClose(); } } }javaChannel在AbstractNioChannel类中,返回SelectableChannel对象,这个对象是在NioSocketChannel初始化时,通过newSocket静态方法,使用默认的SelectorProvider的openSocketChannel得到的,然后通过它的socket()方法,获取套接字进行和服务端的连接,这样服务端和客户端就通过套接字联通在一起了。
服务端分析一样的思路。
时间: 2024-11-08 22:21:42