在netty开发过程中我遇到过长的消息被分割成多个小消息的问题。如下图所示:
其实这两条消息应该是一条消息,它们两个才是一个完整的json字符串。查看代码原来发现少了对象编码与解码类!
Netty的消息传递都是基于流,通过ChannelBuffer传递的,消息Object需要转换成ChannelBuffer来传递。Netty本身已经提供了工具类:ObjectEncoder和ObjectDecoder。
可惜开发人员没有加。原来的代码是这的:
EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ChannelFuture chFuture =new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new PushingChannelHandler()); } }) .bind(PORT).sync(); if (chFuture.isSuccess()) { LGR.info("MessagePushingTCPServer started successfully!"); chFuture.channel().closeFuture().sync(); } } catch (Throwable e) { LGR.error("error occurred in MessagePushingTCPServer", e); } finally { Future<?> bossShutdownFuture = boss.shutdownGracefully(); Future<?> workerShutdownFuture = worker.shutdownGracefully(); try { bossShutdownFuture.await(); workerShutdownFuture.await(); } catch (InterruptedException e) { LGR.error("shutdown await interrupted by exception."); } }
注意其中红色代码部分并没有在ChannelPipeline注册ObjectEncoder和ObjectDecoder。
红色代码部分正确的代码是这样子的:
ch.pipeline() .addLast(new ObjectEncoder()) .addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))) .addLast(new PushingChannelHandler());
但这样改了子后会遇到我们以前的客户端兼容问题。我们已经发布的APP将全部不能收到推送,损失几乎不能接受。
后来我尝试更改ChannelOption终于可以支持长消息了,现将代码贴出,目前算是一种补救方案。
ChannelFuture chFuture =new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new PushingChannelHandler()); } }) .bind(PORT).sync();
增加了一行设置ChannelOption的代码:
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048)) 目前测试没有任何问题,可以满足我们公司的需求。但背后的原理目前我还不太明白。
时间: 2024-10-13 22:27:14