Netty Client和Server端实现

本文基于Nett4.0.26.Final版本浅析Client与Server端通讯,先看服务器端:

public class Server {

    public static void run(int port) {
        /**Netty创建ServerSocketChannel,默认SelectionKey.OP_ACCEPT*/
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                     .channel(NioServerSocketChannel.class)                // 设置Channel Type
                     .option(ChannelOption.SO_BACKLOG, 1024)            // 设置Channel属性
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                            pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new SimpleChannelHandler());
                        }
                    });
            /**服务器端绑定监听端口并对Channel进行初始化
             * 1-ChannelConfig由ChannelOption初始化
             * 2-ChannelPipeline(默认DefaultChannelPipeline)添加ChannelHandler
             * 3-注册Channel并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE
             * 以异步的方式等待上述操作的完成
             * */
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            if (channelFuture.isDone()) {
                System.out.println(String.format("server bind port %s sucess", port));
            }
            /**CloseFuture异步方式关闭*/
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String []args) {
        Server.run(8080);
    }
}

public class SimpleChannelHandler implements ChannelInboundHandler {

    private static final Gson GSON = new GsonBuilder().create();

    /**
     * the method called when new connect come
     * */
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println(String.format("last channel handler [%s] add", ctx.pipeline().last().getClass().getSimpleName()));
    }

    /**
     * the method called when client close connect
     * */
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx.disconnect(ctx.newPromise());
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    /**
     * register port for connect channel
     * */
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        String connect = ctx.channel().remoteAddress().toString().substring(1);
        System.out.println(String.format("remote connecter address %s", connect));
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request req = GSON.fromJson(String.valueOf(msg), Request.class);
        String json = GSON.toJson(new Response(String.format("server get client status [%s]", req.getStatus()), new Random().nextInt(10)));
        ctx.write(json);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

}

服务器端的ChannelHandler的handlerRemoved方法是当客户端关闭链接时该方法被触发,服务器应当关闭当前与客户端的连接,完成TCP的四次挥手过程。

客户端的实现:

public class Client {

	public static void run(String host, int port) {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group)
					 .channel(NioSocketChannel.class)
					 .option(ChannelOption.TCP_NODELAY, true)
					 .handler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
							pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
							pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
							pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
							pipeline.addLast(new SimpleClientChannelHandler());
						}

					});
			/**客户端向服务器发起连接请求
			 * 1-ChannelConfig由ChannelOption初始化
			 * 2-ChannelPipeline(默认DefaultChannelPipeline)添加ChannelHandler
			 * 3-注册Channel并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE
			 * 以异步的方式等待上述操作的完成
			 * */
			ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
			if (channelFuture.isSuccess()) {
				System.out.println(String.format("connect server(%s:%s) sucess", host, port));
			}
			channelFuture.channel().closeFuture().sync();
			System.out.println("client close sucess");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			group.shutdownGracefully();
		}
	}

	public static void main(String []args) {
		for (int i = 0 ; i < 3 ; ++i) {
			Client.run("127.0.0.1", 8080);
			System.out.println();
		}
//		Client.run("127.0.0.1", 8080);
	}
}

public class SimpleClientChannelHandler implements ChannelInboundHandler{

	private static final Gson GSON = new GsonBuilder().create();

	/**
	 * the method called when client add channel handler(1)
	 * */
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		ChannelHandler channelHandler = ctx.channel().pipeline().last();
		System.out.println("client last channel handle " + channelHandler.getClass().getSimpleName());
	}

	/**
	 * the method called when server disconnect
	 * */
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		Channel ch = ctx.channel();
		SocketAddress local = ch.localAddress();
		SocketAddress remote = ch.remoteAddress();
		System.out.println(String.format("server(%s) diconnect and client(%s) close connect", remote.toString().substring(1), local.toString().substring(1)));
		ctx.close();
	}

	/**
	 * the method called for register port before connect server(2)
	 * */
	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
		System.out.println("client start to register port");
	}

	@Override
	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

	}

	/**
	 * the method called when channel active(3)
	 * */
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		String json = GSON.toJson(new Request("client status", new Random().nextInt(10)));
		ctx.writeAndFlush(json);
		System.out.println(String.format("connect established and send to server message [%s]", json));
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {

	}

	/**
	 * close after receive response from server(server also should close connect)
	 * */
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println(String.format("client receive message [%s]", String.valueOf(msg)));
		ctx.disconnect(ctx.newPromise());
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("77777");
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		System.out.println("88888");
	}

	@Override
	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
		System.out.println("99999");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
	}

}

  在客户端的ChannelHandler中有几个关键方法:

channelActive方法:客户端与服务器建立连接且Channel被激活时该方法被调用,本文在客户端与服务器端建立连接就绪时向服务器发送数据

channelRead方法:当服务器端有数据发送时方法被调用,本文在收到服务器端响应时关闭当前连接(此时服务器端的handlerRemoved方法被调用)

handlerRemoved方法:当服务器确认断开连接时该方法被调用,客户端应关闭Channel(TCP四次挥手结束)

时间: 2024-08-10 21:30:33

Netty Client和Server端实现的相关文章

搜集的一些RTMP项目,有Server端也有Client端

查询一些RTMP的协议封装时找到了一些RTMP开源项目,在这里列举一下,以后有时间或是有兴趣可以参考一下: just very few of them. Red5 only contains a server-implementation (in java). The python project rtmpy aims to be a freesoftware implementation of an RTMP library, whilst Tape intends to be a fulls

微服务学习三:springboot与springcloud集成之Eurake的使用(server端,client端)

这个多亏了网站上的一个大神的博客: http://blog.csdn.net/forezp/article/details/70148833 强烈推荐学习: 1.springcloud是什么,这个大家百度吧,我一会也说不明白,但是比dubbo更强大,包含了dubbo的内容,也包含了比dubbo更多的内容. 2.什么是Eurake?关于这个我觉得如果大家知道zookeeper就明白了Eurake的优势. springcloud集成了Eurake,所以对于服务的治理注册更方便简洁了,而且不需要安装,

应用java多线程实现server端与多client之间的通信

应用多线程来实现server与多线程之间的通信的基本步骤 1.server端创建ServerSocket,循环调用accept()等待client链接 2.client创建一个Socket并请求和server端链接 3.server端接受client请求,创建socekt与该client建立专线链接 4.建立链接的socket在一个单独的线程上对话 5.server继续等待新的链接 server端Server.java package test.concurrent.socket; import

Socket编程】使用C++实现Server端和Client端

我是在Visual Stdio 2013上建立了两个工程,分别编译运行下面的两个main文件,然后进行测试的 服务端:Server.cpp #include #include using std::cout; using std::cin; using std::endl; #include using std::string; #pragma comment(lib,"ws2_32.lib") void main() { //创建套接字 WORD myVersionRequest;

Fms3中client端与server端交互方式汇总

系列文章导航 Flex,Fms3相关文章索引 Flex和Fms3打造在线聊天室(利用NetConnection对象和SharedObject对象) Fms3和Flex打造在线视频录制和回放 Fms3和Flex打造在线多人视频会议和视频聊天(附原代码) Fms3中client端与server端交互方式汇总 免费美女视频聊天,多人视频会议功能加强版本(Fms3和Flex开发(附源码)) 免费网络远程视频会议系统,免费美女多人视频聊天(附源码下载)(Flex和Fms3开发) 开源Flex Air版免费

怎么在yar的server端任何地方获得client请求调用的方法

先说下碰到的问题吧:上周调查个问题发现,在rpc server端解析client上传上来的post数据,解包,找函数,执行都在Yar_Server的函数handle中执行了.没有向后面的系统或者服务传递上下文的方法.为了调查问题我们只能在函数调用里面记录哪个方法被调用了. 那么是不是可以在Yar_Server里面试着增加个静态变量保存内容,限于自身能力,现只增加了一个方法,返回了调用的method,用于server端向后面传递. 如下, 小改之后就可以通过 Yar_Server::getCall

BluetoothLE-Multi-Library 一个能够连接多台蓝牙设备的库,它可以作为client端,也可以为server端。支持主机/从机,外围设备连接。

github地址:https://github.com/qindachang/BluetoothLE-Multi-Library BluetoothLE-Multi-Library 一个能够连接多台蓝牙设备的库,它可以作为client端,也可以为server端.支持主机/从机,外围设备连接.在发送消息时,它内部支持队列控制,避免因蓝牙间隔出现操作失败的情况. 开始使用 1. 主机client 扫描 BluetoothLeScannerCompat scannerCompat = Bluetoot

UDP也需要现有Server端,然后再有Client端

UDP编程: DatagramSocket(邮递员):对应数据报的Socket概念,不需要创建两个socket,不可使用输入输出流. DatagramPacket(信件):数据包,是UDP下进行传输数据的单位,数据存放在字节数组中. UDP也需要现有Server端,然后再有Client端. 两端都是DatagramPacket(相当于电话的概念),需要NEW两个DatagramPacket. InetAddress:网址 这种信息传输方式相当于传真,信息打包,在接受端准备纸. 模式: 发送端:S

用同一台PC的两个网口实现Iperf的server端和client端

用同一台PC的两个网口实现Iperf的server端和client端 2015年10月20日 20:35:11 阅读数:2943 有时候需要发包,仅仅需要一定速率的流量,并不需要关心收到报文的大小,一个好用的开源软件发包工具并不好找,iperf发包很方便,但是一般需要两台电脑,分别作为server端和client端,如果使用一个PC的两个端口分别作为Iperf的Server端和Client端,只需要一台电脑,作为一个可携带的发包工具,会大大方便携带与使用. 将一台电脑的两个端口分别配置为不同的网