每个通道关联一个Pipeline,在流水线中拦截处理各种事件的对象就是ChannelHandler,它处理ChannelEvent而后进行传递。
接口ChannelHandler没有提供任何方法,有两个子接口分别用来规范处理上行和下行的通道事件。
ChannelHandler是随ChannelHandlerContext对象提供的,handler通过这个context对象参与这个Pipeline的交互管理,通过它所属的context对象,一个handler可以传递上行或下行事件,动态改变流水线,存储信息到attachment。
状态管理:一个ChannelHandler通常需要存一些状态信息,最简单推荐的方式就是利用成员变量(放在特定的Handler类中),向下面这样:
public class DataServerHandler extends SimpleChannelHandler
{
private boolean loggedIn ; //
存储状态信息
@Override
public void messageReceived(ChannelHandlerContext
ctx, MessageEvent e) {
Channel ch = e.getChannel();
Object o = e.getMessage();
if (o instanceof LoginMessage)
{
authenticate((LoginMessage )
o);
loggedIn = true ;
} else (o instanceof GetDataMessage) {
if (loggedIn)
{
ch.write( fetchSecret((GetDataMessage )
o));
} else {
fail();
}
}
}
}
因为每个Handler实例是服务于一个连接的,所以对于每个新的Channel都应该创建一个新的ChannelHandler实例。
//Create a new handler instance per channel.
// See Bootstrap.setPipelineFactory(ChannelPipelineFactory).
public class DataServerPipelineFactory implements ChannelPipelineFactory
{
public ChannelPipeline
getPipeline() {
return Channels.pipeline( new DataServerHandler());//注意这里
}
}
但是有时候并不需要很多冗余Handler,做着相同的工作,不需要为每个连接(或Channel)都创建一个这样的Handler,所以可以利用attachment存储状态信息,(或许可以认为这样的Handler是安全的,可重入的)。向下面这样:
@Sharable
public class DataServerHandler2 extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
{
Channel ch = e.getChannel();
Object o = e.getMessage();
if (o instanceof LoginMessage)
{
authenticate((LoginMessage )
o);
ctx.setAttachment( true );
} else (o instanceof GetDataMessage) {
if (Boolean.TRUE.equals(ctx.getAttachment()))
{
ch.write( fetchSecret((GetDataMessage )
o));
} else {
fail();
}
}
}
}
这样就可以为不同的Pipeline增加同一个Handler,如下:
public class DataServerPipelineFactory2 implements ChannelPipelineFactory
{
private static final ChannelHandler SHARED = new DataServerHandler2();
public ChannelPipeline
getPipeline() {
return Channels.pipeline( new ChannelHandler[]
{ SHARED });
}
}
使用ChannelLocal:如果有的状态变量需要从其他Handler或者Handler之外来访问,就需要用
ChannelLocal这个Iterable,相当于从属于这个Channel的全局变量,可以联想TreadLocal。
public final class DataServerState
{
public static final ChannelLocal<Boolean> loggedIn = new ChannelLocal<Boolean>()
{
protected Boolean
initialValue(Channel channel) {
return false ;
}
};
}
Handler此时这样写:
@ Sharable
public class DataServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
{
Channel ch = e.getChannel();
Object o = e.getMessage();
if (o instanceof LoginMessage)
{
authenticate((LoginMessage )
o);
DataServerState.loggedIn.set(ch, true );//
更新这个ChannelLocal
} else (o instanceof GetDataMessage) {
if (DataServerState.loggedIn.get(ch))
{
ctx.getChannel().write(fetchSecret(( GetDataMessage)
o));
} else {
fail();
}
}
}
}
根据每个Channel的状态进行逻辑处理,使用场景:
// Print the remote addresses of the authenticated clients:
ChannelGroup allClientChannels = ...;
for (Channel ch:
allClientChannels) {
if (DataServerState.loggedIn.get(ch))
{
System.out.println(ch.getRemoteAddress());
}
}
@Sharable注解:如果一个ChannelHandler前面有该注解,意味着可以将该对象的一个实例分配给多个Pipeline,而不会发生竟态条件。否则的话就要为每个Pipeline单独创建一个实例,因为存在非共享的状态如成员变量。