Netty5 echo server练习

Netty5的架构比Netty4的架构及代码上都有很多的改进,看起来很清晰。

下面的根据官网例子改进的Echo Server例子代码。

代码中增加的有netty自带的hander,LineBasedFrameDecoder主要解决网络传输的粘包/拆包问题。StringDecoder和StringEncoder主要解决netty ByteBuf和string之间的转换。

Netty in action 上也提到了几个注意事项:读取消息后,记得要释放,必须在finally块释放(父类ChannelHandlerAdapter),和锁的释放位置是一样的,为的是确保释放操作必须执行,那为什么写消息不用显示释放的原因是Netty帮我们做了。当然,你的hander父类是SimpleChannelInboundHandler就不用显示释放了,因为Netty又帮你做了。

下面代码主要是给字符串添加delimiter(分割符)。

5.0.0.Alpha2版本:

/*
 * Copyright (C) 2014- now() The  Netty5-2015  Authors
 *
 *
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.doctor.netty5.ebook.netty_in_action05.common;

import java.util.stream.Stream;

/**
 * @author doctor
 *
 * @time 2015年7月6日
 */
public final class NettyUtil {
	public static final String END_OF_LINE = "\n";

	public static String appenEndOfLine(String... message) {
		StringBuilder stringBuilder = new StringBuilder(256);
		Stream.of(message).forEachOrdered(stringBuilder::append);
		stringBuilder.append(END_OF_LINE);
		return stringBuilder.toString();
	}
}

服务端代码:

/*
 * Copyright (C) 2014-present  The  Netty5-2015  Authors
 *
 *
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.doctor.netty5.example.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;

import com.doctor.netty5.ebook.netty_in_action05.common.NettyUtil;

/**
 * @author doctor
 *
 * @time 2015年7月6日
 */
public class EchoServer {
	private final int port;

	public EchoServer() {
		this(8989);
	}

	public EchoServer(int port) {
		this.port = port;
	}

	/**
	 *
	 * @param args
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws InterruptedException {
		new EchoServer().start();

	}

	public void start() throws InterruptedException {
		ServerBootstrap bootstrap = new ServerBootstrap();
		NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
		NioEventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			bootstrap.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.localAddress(port)
					.childHandler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
							ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
							ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
							ch.pipeline().addLast(new EchoServerHandler());
						}
					});

			ChannelFuture channelFuture = bootstrap.bind().sync();
			System.out.println(EchoServer.class.getName() + " started and listen on port:" + channelFuture.channel().localAddress());

			channelFuture.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

	private static class EchoServerHandler extends ChannelHandlerAdapter {

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			cause.printStackTrace();
			ctx.close();
		}

		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			System.out.println(msg);
			ctx.write(NettyUtil.appenEndOfLine("服务器返回:", (String) msg));

		}

		@Override
		public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
			ctx.flush();
		}
	}

}

客户端代码:

/*
 * Copyright (C) 2014-present  The  Netty5-2015  Authors
 *
 *
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.doctor.netty5.example.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.ReferenceCountUtil;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

import com.doctor.netty5.ebook.netty_in_action05.common.NettyUtil;

/**
 * @author doctor
 *
 * @time 2015年7月6日
 */
public class EchoClient {

	private final String host;
	private final int port;

	public EchoClient() {
		this("localhost", 8989);
	}

	public EchoClient(String host) {
		this(host, 8989);
	}

	public EchoClient(String host, int port) {
		this.host = host;
		this.port = port;
	}

	/**
	 * @param args
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws InterruptedException {
		new EchoClient().start();

	}

	public void start() throws InterruptedException {
		NioEventLoopGroup workersGroup = new NioEventLoopGroup(1);

		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(workersGroup)
					.channel(NioSocketChannel.class)
					.remoteAddress(host, port)
					.handler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
							ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
							ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
							ch.pipeline().addLast(new EchoClientHandler());
						}
					});

			ChannelFuture channelFuture = bootstrap.connect().sync();
			channelFuture.channel().closeFuture().sync();

		} finally {
			workersGroup.shutdownGracefully();
		}
	}

	private static class EchoClientHandler extends ChannelHandlerAdapter {

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			cause.printStackTrace();
			ctx.close();
		}

		@Override
		public void channelActive(ChannelHandlerContext ctx) throws Exception {
			ctx.writeAndFlush(NettyUtil.appenEndOfLine("我要连接...."));
			new Thread(new Hander(ctx)).start();
		}

		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			try {
				System.out.println(msg);
			} finally {
				// 读完消息记得释放,那写消息为什么不这样操作呢,因为写完消息netty自动释放。
				// 其操作见:DefaultChannelHandlerInvoker L331-332,不过有这个注释-> promise cancelled
				// 是不少netty5正式发布的时候会取消呢。
				// 我们可以使用SimpleChannelInboundHandler作为父类,因为释放操作已实现。
				ReferenceCountUtil.release(msg);
			}

		}
	}

	private static class Hander implements Runnable {
		private ChannelHandlerContext ctx = null;
		private Scanner scanner = new Scanner(System.in);

		public Hander(ChannelHandlerContext ctx) {
			this.ctx = ctx;
		}

		@Override
		public void run() {
			while (true) {
				String nextLine = scanner.nextLine();
				ctx.writeAndFlush(NettyUtil.appenEndOfLine(nextLine));
			}

		}
	}

}

客户端启动了一个线程,异步操作从控制台获取输入字符串(回车结束)。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-22 07:40:36

Netty5 echo server练习的相关文章

libevent入门教程:Echo Server based on libevent - Blog of Felix021 - 日,泯然众人矣。

a:focus { outline: thin dotted #333; outline: 5px auto -webkit-focus-ring-color; outline-offset: -2px; } a:hover { outline: 0; } a:active { outline: 0; } a:hover { color: #005580 !important; text-decoration: underline !important; } blockquote small:b

[z]libevent入门教程:Echo Server based on libevent 不指定

[z]https://www.felix021.com/blog/read.php?2068 花了两天的时间在libevent上,想总结下,就以写简单tutorial的方式吧,貌似没有一篇简单的说明,让人马上就能上手用的. 首先给出官方文档吧: http://libevent.org ,首页有个Programming with Libevent,里面是一节一节的介绍libevent,但是感觉信息量太大了,而且还是英文的-.-(当然,如果想好好用libevent,看看还是很有必要的),还有个Ref

[UMU 学 golang](5) HTTP Echo Server

做加速时经常需要用到 HTTP Echo Server 来测试加速有没有成功,如果成功了,是显示请求来自加速代理服务器.原来用 node.js 写了一个,代码如下: var http = require('http'); http.createServer( function (req, res) { res.writeHead(200, {'Content-Type': 'text/plain'}); var ip = req.headers['x-forwarded-for'] || req

[UMU 学 golang](3) TCP Echo Server

测试需要,以前用 C + libevent 写了一个 TCP Echo Server,返回服务器时间.客户端地址信息和客户端发送的原内容.为了水一篇,现在改为 golang 实现. package main import ( "fmt" "io" "net" "os" "time" ) const BUFFER_SIZE = 1024 * 4 var buffer = make([]byte, BUFFER

libevent 入门教程:Echo Server based on libevent(转)

下面假定已经学习过基本的socket编程(socket, bind, listen, accept, connect, recv, send, close),并且对异步/callback有基本的认识. 基本的socket编程是阻塞/同步的,每个操作除非已经完成或者出错才会返回,这样对于每一个请求,要使用一个线程或者单独的进程去处理,系统资源没法支撑大量的请求.Posix定义了可以使用异步的select系统调用,但是因为其采用了轮询的方式来判断某个fd是否变成active,效率不高o(n).于是各

分别使用Java IO、NIO、Netty实现的一个Echo Server示例

分别使用Java IO.Java NIO.Netty来实现一个简单的EchoServer(即原样返回客户端的输入信息). Java IO int port = 9000; ServerSocket ss = new ServerSocket(port); while (true) { final Socket socket = ss.accept(); new Thread(new Runnable() { public void run() { while (true) { try { Buf

TCP 多线程并发 echo server

参考了一些开源实现: 1 //multi_thread_server.c 2 // gcc -o multi_thread_server multi_thread_server.c -lpthread 3 4 5 #include <netinet/in.h> // for sockaddr_in 6 #include <sys/types.h> // for socket 7 #include <sys/socket.h> // for socket 8 #inclu

Akka学习笔记(2)-- Echo Server

EchoServer 上篇文章里,我们用Akka写了一个简单的HelloWorld例子,对Akka(以及Actor模式)有了初步的认识.本文将用Akka写一个EchoServer,看看在Actor的世界里,如何使用TCP协议. Github项目 照例,EchoServer的代码被放在了Github上.EchoServer比HelloWorld稍微复杂一点,一共有三个类,如下图所示: Main 这次先从主类入手: main()方法的第一行创建了一个Actor系统,名字为mySystem.接下来的四

tcp echo server libuv

#include <stdio.h>#include <stdlib.h>#include <string.h>#include <uv.h> #define DEFAULT_PORT 7000#define DEFAULT_BACKLOG 128 uv_loop_t *loop;struct sockaddr_in addr; void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_