Java Netty (2)

通过一个实例来说明Netty的使用。用1个服务器连接5个客户端线程,客户端连接上服务器以后就向服务器发送消息,服务器接收到消息后向客户端返回消息,客户端接收到消息以后,等待随机的时间,再向服务端发送消息,这样一直循环下去。

项目结构:

NettyServer.java:

package Server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

import Util.Constant;

public class NettyServer {

    public static String host = "127.0.0.1";

    // 创建1个线程池
    static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));

    public static void main(String[] args) {
        // ChannelFactory
        final ChannelFactory channelFactory = new NioServerSocketChannelFactory(
                // Boss线程池,处理Socket请求
                Executors.newCachedThreadPool(),
                // Worker线程池,由于使用的是NIO,1个Worker线程可以管理多个Channel
                Executors.newCachedThreadPool());
        // ServerBootstrap
        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);

        ServerPipelineFactory serverPipelineFactory = new ServerPipelineFactory(executionHandler);
        bootstrap.setPipelineFactory(serverPipelineFactory); 

        // 禁用nagle算法
        bootstrap.setOption("child.tcpNoDelay", true);
        // 启用TCP保活检测
        bootstrap.setOption("child.keepAlive", true); 

        // 监听5个端口
        bootstrap.bind(new InetSocketAddress(Constant.p1));
        System.out.println("Listening port " + Constant.p1 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p2));
        System.out.println("Listening port " + Constant.p2 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p3));
        System.out.println("Listening port " + Constant.p3 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p4));
        System.out.println("Listening port " + Constant.p4 + "...");
        bootstrap.bind(new InetSocketAddress(Constant.p5));
        System.out.println("Listening port " + Constant.p5 + "...");
    }

}

ServerPipelineFactory.java:

package Server;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;

public class ServerPipelineFactory implements ChannelPipelineFactory {

    private final ExecutionHandler executionHandler; 

    public ServerPipelineFactory(ExecutionHandler executionHandler){
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        // TODO Auto-generated method stub
        return Channels.pipeline(
                new StringEncoder(),
                new StringDecoder(),
                // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
                executionHandler,
                // 业务逻辑handler
                new MyServerHandler());
    } 

}

MyServerHandler.java:

package Server;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import Util.Tool;

public class MyServerHandler extends SimpleChannelHandler{

    @SuppressWarnings("static-access")
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("Server received:" + e.getMessage());
        // 休息随机秒后发送消息
        Thread th = Thread.currentThread();
        int interval = Tool.getInterval(100);
        th.sleep(interval*1000);
        e.getChannel().write("from Server: Hello!");
        super.messageReceived(ctx, e);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        e.getCause().printStackTrace();
        Channel ch = e.getChannel();
        ch.close();
        super.exceptionCaught(ctx, e);
    } 

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("A client connected!");
        super.channelConnected(ctx, e);
    }

}

NettyClient.java:

package Client;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

import Util.Constant;

public class NettyClient extends Thread{

    public static String host = "127.0.0.1";
    ClientBootstrap bootstrap;
    int port;

    // 创建1个线程池
    static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));

    public NettyClient(int port) {
        this.port = port;
        // ChannelFactory
        final ChannelFactory channelFactory = new NioClientSocketChannelFactory(
                // Boss线程池
                Executors.newCachedThreadPool(),
                // Worker线程池
                Executors.newCachedThreadPool());
        // ServerBootstrap
        bootstrap = new ClientBootstrap(channelFactory);

        ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(executionHandler);
        bootstrap.setPipelineFactory(clientPipelineFactory);
        bootstrap.setOption("tcpNoDelay" ,true);
        bootstrap.setOption("keepAlive", true);
        bootstrap.connect(new InetSocketAddress(port));

    }

    public void run(){
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
        // 开始试图连接
        System.out.println("Connecting to port " + port + "...");
        // 等待直到连接关闭或失败
        future.getChannel().getCloseFuture().awaitUninterruptibly();
        // 关闭线程池准备退出
        bootstrap.releaseExternalResources();
    }

    public static void main(String[] args) {
        NettyClient nc1 = new NettyClient(Constant.p1);
        NettyClient nc2 = new NettyClient(Constant.p2);
        NettyClient nc3 = new NettyClient(Constant.p3);
        NettyClient nc4 = new NettyClient(Constant.p4);
        NettyClient nc5 = new NettyClient(Constant.p5);

        nc1.start();
        nc2.start();
        nc3.start();
        nc4.start();
        nc5.start();
    }

}

ClientPipelineFactory.java:

package Client;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;

public class ClientPipelineFactory implements ChannelPipelineFactory {

    private final ExecutionHandler executionHandler; 

    public ClientPipelineFactory(ExecutionHandler executionHandler){
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        // TODO Auto-generated method stub
        return Channels.pipeline(
                new StringEncoder(),
                new StringDecoder(),
                // 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
                executionHandler,
                // 业务逻辑handler
                new MyClientHandler());
    }
}

MyClientHandler.java:

package Client;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

import Util.Tool;

public class MyClientHandler extends SimpleChannelHandler{

    // 连接到服务端时,发出消息
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("Connected to Server!");
        e.getChannel().write("from Client: Hello! " + System.currentTimeMillis());
        super.channelConnected(ctx, e);
    }  

    @SuppressWarnings("static-access")
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("Client Received:" + e.getMessage());
        // 休息随机秒后发送消息
        Thread th = Thread.currentThread();
        int interval = Tool.getInterval(5);
        th.sleep(interval*1000);
        e.getChannel().write("from Client: Hello! "  + System.currentTimeMillis());
        super.messageReceived(ctx, e);
    }  

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        e.getCause().printStackTrace();
        Channel ch = e.getChannel();
        ch.close();
        super.exceptionCaught(ctx, e);
    } 

}

Constant.java:

package Util;

public class Constant {
    final static int start = 10000;
    public static int p1 = start + 1;
    public static int p2 = start + 2;
    public static int p3 = start + 3;
    public static int p4 = start + 4;
    public static int p5 = start + 5;
}

Tool.java:

package Util;

import java.util.Random;

public class Tool {

    static Random rand = new Random();

    public static int getInterval(int max){
        return rand.nextInt(max);
    }
}
时间: 2024-10-08 21:38:39

Java Netty (2)的相关文章

java netty socket库和自定义C#socket库利用protobuf进行通信完整实例

之前的文章讲述了socket通信的一些基本知识,已经本人自定义的C#版本的socket.和java netty 库的二次封装,但是没有真正的发表测试用例. 本文只是为了讲解利用protobuf 进行C# 和 java的通信.以及完整的实例代码 java 代码 svn 地址,本人开发工具是NetBeans 8.0.2 使用 maven 项目编译 http://code.taobao.org/svn/flynetwork_csharp/trunk/BlogTest c# 代码 svn 地址 使用的是

学习 java netty (一) -- java nio

前言:最近在研究java netty这个网络框架,第一篇先介绍java的nio. java nio在jdk1.4引入,其实也算比较早的了,主要引入非阻塞io和io多路复用.内部基于reactor模式. nio核心: - buffer - channel - selector buffer: 类似网络编程中的缓冲区,有 ByteBuffer 字节 CharBuffer 字符 IntBuffer DoubleBuffer- 常用的有ByteBuffer和CharBuffer java nio buf

学习 java netty (三) -- Channel

学习 java netty (三) – Channel 前言:netty封装的channel,看一下官网的定义 A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind. 可以I/O操作(如读,写,连接和绑定)的连网套接字或组件 A channel provides a user: All I/O operations a

学习 java netty (二) -- ServerBootstrap

前言:我们自己使用java nio开发网络程序是非常繁琐的,netty为我们做好了一切,其中ServerBootstrap是一个启动辅助类,了解它我们就能开发出简单的nio 服务端程序. 不理解Nio中channel和handler等可参考上一篇文章 学习 java netty (一) – java nio ServerBootstrap(): //创建一个ServerBootstrap对象 ServerBootstrap server = new ServerBootstrap; Server

Java Netty (1)

Netty是由JBOSS提供的一个java开源框架,本质上也是NIO,是对NIO的封装,比NIO更加高级,可以说发展的路线是IO->NIO->Netty. ServerBootstrap和ClientBootstrap是Netty中两个比较重要的类,分别用来进行服务器和客户端的初始化. 服务器: // ChannelFactory final ChannelFactory channelFactory = new NioServerSocketChannelFactory( // Boss线程

100万并发连接服务器笔记之Java Netty处理1M连接会怎么样

前言 每一种该语言在某些极限情况下的表现一般都不太一样,那么我常用的Java语言,在达到100万个并发连接情况下,会怎么样呢,有些好奇,更有些期盼.  这次使用经常使用的顺手的     netty NIO框架(netty-3.6.5.Final),封装的很好,接口很全面,就像它现在的域名   netty.io,专注于网络IO.  整个过程没有什么技术含量,浅显分析过就更显得有些枯燥无聊,准备好,硬着头皮吧. 测试服务器配置 运行在VMWare Workstation 9中,64位Centos 6

基于Java Netty框架构建高性能的Jt808协议的GPS服务器(转)

原文地址:http://www.jt808.com/?p=971 使用Java语言开发一个高质量和高性能的jt808 协议的GPS通信服务器,并不是一件简单容易的事情,开发出来一段程序和能够承受数十万台车载接入是两码事,除去开发部标jt808协议的固有复杂性和几个月长周期的协议Bug调试,作为大批量794车载终端接入的服务端,需要能够处理网络的闪断.客户端的重连.安全认证和消息的编解码.半包处理等.如果没有足够的网络编程经验积累和深入了解部标jt808协议文档,自研的GPS服务器往往需要半年甚至

JAVA netty 简单使用

实现一个功能,客户端和服务器 轮流对一个数加+1 服务器 public class Server { public static void main(String[] args) { NioEventLoopGroup boss=new NioEventLoopGroup(1); NioEventLoopGroup worker=new NioEventLoopGroup(3); try { final ServerBootstrap server=new ServerBootstrap();

Java netty的option(ChannelOption.SO_BACKLOG,1024 )的意思

ChannelOption.SO_BACKLOG, 1024 BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度.如果未设置或所设置的值小于1,Java将使用默认值50. ChannelOption.SO_KEEPALIVE, true 是否启用心跳保活机制.在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活. Chann