nio client和netty server实例

花了一周时间,研究了java里面的nio和netty,其实nio很好理解,用过c语言的,都应该知道select和epoll,nio和select和epoll非常类似,使用方法和解决的问题也都是一样的。

至于netty,不得不钦佩java语言的框架技术,虽说这个框架研究起来非常费劲,但是对于上层使用者,使用这个netty框架,会帮我们解决很多性能、稳定性问题。同时,使用框架,也会大大提高开发效率。

这里,不想讲太多关于nio和netty的东西,所有最基本的知识点,都在如下学习资料中。目前我对这个netty框架研究的还不深入,想了半天其实真写不出啥有水平的文章,待今后深入研究后,将学习成果再和大家汇报。

学习资料:

NIO:http://www.iteye.com/magazines/132-Java-NIO#590

Netty:http://docs.jboss.org/netty/3.1/guide/html_single/

多线程:http://www.cnblogs.com/dolphin0520/p/3932921.html

这里给出一个用nio实现的tcp client、用netty实现的一个tcp server的例子。

处理过程为:client传递a、b两个整型数,server计算和,将结果返回给客户端。在服务端加入线程池,用来处理两个数的和。当然了从性能角度,目前这么简单的操作完全没有必要这样做。主要考虑到如果有更复杂的操作,一般的服务端的模型都是将任务传入一个消息队列,后端再用线程池从消息队列中取出任务进行处理,再返回处理结果。所以这个地方的线程池,可以认为是以后的扩展,也可以认为其就是个摆设。

client 代码

NioClient.java

package nio.client.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class NioClient {
	private final static int MAX_BUF_SIZE 					= 1024;
	private InetSocketAddress serverAddr;
	private int clientCount;

	public NioClient(String ip, int port, int clientCount) {
		this.clientCount 	= clientCount;
		this.serverAddr 	= new InetSocketAddress(ip, port);
	}

	private void sendMessageToSrv(SocketChannel sockChnl, int clientNo, int index) throws IOException {
		// send data to server...
/*		ByteBuffer sendBuf = ByteBuffer.allocate(MAX_BUF_SIZE);
		String sendText = "Client " + clientNo + " say " + index + "\r\n";
		sendBuf.put(sendText.getBytes());
		sendBuf.flip();
		sockChnl.write(sendBuf);
		System.out.println(sendText);*/

		ByteBuffer sendBuf = ByteBuffer.allocate(4*4);
		sendBuf.putInt(clientNo);
		sendBuf.putInt(index);
		sendBuf.putInt(clientNo);
		sendBuf.putInt(index);
		sendBuf.flip();

		sockChnl.write(sendBuf);
		String out = String.format("client: %d send message, index: %d, a: %d, b: %d", clientNo, index, clientNo, index);
		System.out.println(out);
	}

	private void recvMessage(SocketChannel sockChnl, int clientNo) throws IOException {
		/*ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE);
		int bytesRead = sockChnl.read(recvBuf);
		while (bytesRead > 0) {
			recvBuf.flip(); // write mode to read mode, position to 0, // limit to position
			String recvText = new String(recvBuf.array(), 0, bytesRead);
			recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity
			System.out.println("Client " + clientNo + " receive: " + recvText);
			bytesRead = sockChnl.read(recvBuf);
		}*/

		ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE);
		int bytesRead = sockChnl.read(recvBuf);
		while (bytesRead > 0) {
			recvBuf.flip(); // write mode to read mode, position to 0, // limit to position
			int result = recvBuf.getInt();
			recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity
			String out = String.format("client: %d recv message, result: %d", clientNo, result);
			System.out.println(out);
			bytesRead = sockChnl.read(recvBuf);
		}
	}

	public void startNioClient() throws IOException, InterruptedException {
		Selector selector = Selector.open();

		for (int i = 0; i < clientCount; i++) {
			SocketChannel socketChannel = SocketChannel.open();
			socketChannel.configureBlocking(false);
			Map<String, Integer> clientInfo = new HashMap<String, Integer>();
			clientInfo.put("no", i);
			clientInfo.put("index", 0);
			socketChannel.register(selector, SelectionKey.OP_CONNECT, clientInfo);
			socketChannel.connect(this.serverAddr);
		}

		while (true) {
			int readyChannels = selector.select();
			if (0 == readyChannels) {
				continue;
			}

			Set<SelectionKey> selectionKeys = selector.selectedKeys();
			for (SelectionKey sk : selectionKeys) {
				Map clientInfo = (Map) sk.attachment();
				int clientNo = (Integer) clientInfo.get("no");
				SocketChannel socketchnl = (SocketChannel) sk.channel();

				if (sk.isConnectable()) {
					while(!socketchnl.finishConnect()) {
						Thread.sleep(5);
					}
					if (socketchnl.isConnected()) {
                        System.out.println("connect is finish...");
                        // send data to server...
    					sendMessageToSrv(socketchnl, clientNo, -1);
    					sk.interestOps(SelectionKey.OP_READ);
					}
				} else if (sk.isReadable()) {
					// read data from server...
					recvMessage(socketchnl, clientNo);

					// send data to server...
					int index = (Integer) clientInfo.get("index");
					index += 1;
					sendMessageToSrv(socketchnl, clientNo, index);
					clientInfo.put("index", index);
				}
			}
			selectionKeys.clear();
		}
	}

	public int getClientCount() {
		return clientCount;
	}

	public void setClientCount(int clientCount) {
		this.clientCount = clientCount;
	}
}

Main.java

package nio.client.test;

import java.io.IOException;

public class Main {
	public static void main(String[] args) {
		System.out.println("clients start..............");
		NioClient client = new NioClient("127.0.0.1", 8080, 5000);
		try {
			client.startNioClient();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

server端代码:

NettyServer.java

package com.bj58.nettyTest;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class NettyServer {

	static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");

	public static void main(String[] args) throws Exception {
	    ExecutorService threadPool = Executors.newFixedThreadPool(5);

		ChannelFactory factory = new NioServerSocketChannelFactory(
				Executors.newCachedThreadPool(),
				Executors.newCachedThreadPool());

		ServerBootstrap bootstrap = new ServerBootstrap(factory);

		bootstrap.setPipelineFactory(new ServerPipelineFactory(threadPool));

		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);

		System.out.println("Netty Server start...");
		Channel channel = bootstrap.bind(new InetSocketAddress(8080));

/*		allChannels.add(channel);
		System.out.println("1111111111111111111");
		Thread.sleep(2*60*1000);
		System.out.println("2222222222222222222");
        ChannelGroupFuture future = allChannels.close();
        future.awaitUninterruptibly();
        factory.releaseExternalResources();
        System.out.println("3333333333333333333");*/

	}
}

ServerDecoder.java

package com.bj58.nettyTest;

import java.util.ArrayList;
import java.util.List;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class ServerDecoder extends FrameDecoder{
    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
        if (buffer.readableBytes() < 8) {
            return null;
        }

        int clientNo = buffer.readInt();
        int index = buffer.readInt();
        int a = buffer.readInt();
        int b = buffer.readInt();

        List<Integer> data = new ArrayList<Integer>();
        data.add(clientNo);
        data.add(index);
        data.add(a);
        data.add(b);

        return data;
    }
}

ServerHandler.java

package com.bj58.nettyTest;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class ServerHandler extends SimpleChannelHandler {

    private ExecutorService threadPool;

    public ServerHandler(ExecutorService threadPool) {
        this.threadPool = threadPool;
    }
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws InterruptedException {
        /*
         * ChannelBuffer buf = (ChannelBuffer)e.getMessage(); byte[] des =
         * buf.array(); String recvText = new String(des, 0, des.length);
         * System.out.println(recvText); Channel ch = e.getChannel();
         * ch.write(e.getMessage());
         */

        List<Integer> data = (List<Integer>) e.getMessage();
        HandleTask ht = new HandleTask(e);
        threadPool.submit(ht);

        int clientNo = data.get(0);
        int index = data.get(1);
        int a = data.get(2);
        int b = data.get(3);

        String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b);
        System.out.println(content);

        Channel ch = e.getChannel();
        ChannelBuffer buf = ChannelBuffers.buffer(4);
        buf.writeInt(a+b);
        ch.write(buf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();

        Channel ch = e.getChannel();
        ch.close();
    }
}

class HandleTask implements Runnable {
    MessageEvent e;

    public HandleTask(MessageEvent e) {
        this.e = e;
    }

    public void run() {
        List<Integer> data = (List<Integer>) e.getMessage();

        int clientNo = data.get(0);
        int index = data.get(1);
        int a = data.get(2);
        int b = data.get(3);

        String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b);
        System.out.println(content);

        Channel ch = e.getChannel();
        ChannelBuffer buf = ChannelBuffers.buffer(4);
        buf.writeInt(a+b);
        ch.write(buf);
    }
}

ServerPipelineFactory.java

package com.bj58.nettyTest;

import static org.jboss.netty.channel.Channels.pipeline;

import java.util.concurrent.ExecutorService;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;

public class ServerPipelineFactory implements ChannelPipelineFactory{   

    private ExecutorService threadPool;

    public ServerPipelineFactory(ExecutorService threadPool) {
        this.threadPool = threadPool;
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();

        pipeline.addLast("framer", new ServerDecoder());
        pipeline.addLast("handler", new ServerHandler(threadPool));
        return pipeline;
    }
}

代码理解:

客户端可以同时启动N个tcp client,同时连接一个tcp server,传递a/b两个数,获取a/b之和。

当代码写完之后,发现通过java的序列化技术,可以直接传递一个java 对象,这样一来,发送和接收端,处理起来会更简单一些,而且实际项目中,传递的数据要比这个复杂的多。

接下来研究一下java的序列化技术、netty如何传递对象、以及Google protobuf,给出一个完整的rpc的代码例子。

时间: 2024-10-08 09:31:53

nio client和netty server实例的相关文章

Netty入门实例及分析

什么是netty?以下是官方文档的简单介绍: The Netty project  is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance and high scalability protocol servers and clients. In other

netty入门实例

TimeServer.java package netty.timeserver.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGro

android binder 机制二(client和普通server)

在讲它们之间的通信之前,我们先以MediaServer为例看看普通Server进程都在干些什么. int main() { -- // 获得ProcessState实例 sp<ProcessState> proc(ProcessState::self()); // 得到ServiceManager的Binder客户端实例 sp<IServiceManager> sm = defaultServiceManager(); -- // 通过ServiceManager的Binder客户

Spring+Netty+WebSocket实例

比较贴近生产,详见注释 一.pom.xml 具体太长,详见源码 </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.2.Final</version> </dependency> 二.目录结构 三.AfterSpringBegin 继承了After

解决Apache虚拟主机报错问题apache AH01630: client denied by server configuration错误解决方法

今天同事咨询通过Apache搭建创建虚拟主机,搭建好发现一直报错,提示 "apache AH01630: client denied by server configuration",在网上搜索了一下, 发现这个错误的原因是,apache2.4 与 apache2.2 的虚拟主机配置写法不同导致. apache2.2的写法: [plain] view plain copy 在CODE上查看代码片派生到我的代码片 <VirtualHost *:80> ServerName f

Java虚拟机6:内存溢出和内存泄露、并行和并发、Minor GC和Full GC、Client模式和Server模式的区别

http://www.cnblogs.com/xrq730/p/4839245.html 前言 之前的文章尤其是讲解GC的时候提到了很多的概念,比如内存溢出和内存泄露.并行与并发.Client模式和Server模式.Minor GC和Full GC,本文详细讲解下这些概念的区别. 内存溢出和内存泄露的区别 1.内存溢出 内存溢出指的是程序在申请内存的时候,没有足够大的空间可以分配了. 2.内存泄露 内存泄露指的是程序在申请内存之后,没有办法释放掉已经申请到内存,它始终占用着内存,即被分配的对象可

第8章 配置 SQL Server 实例

第8章  配置 SQL Server 实例 8.1  查看常规属性 8.2  配置内存属性 8.3  配置处理器属性 8.4  配置安全性属性 8.5  配置连接属性 8.6  配置数据库设置属性 8.7  配置高级属性 8.8  配置权限属性

解决apache AH01630: client denied by server configuration错误

昨天给公司配置了apache-2.4.9的版本,今天他们要求把虚拟主机配置起好放网站程序,在修改apache-2.4.9的配置文件中,我发现了2.4.x跟以前的2.2.x里面的很多配置都不一样了,比如配置这个虚拟主机都有一些不同,按照以前的配置方法,会报下面的错误:AH01630: client denied by server configuration: /usr/local/apache/htdocs/recx/ 先给大家看看我按照apache-2.2.x配置虚拟机的内容:NameVirt

SQL Server 2012笔记分享-4:理解SQL server实例

每个单独的SQL server实例都有一个windows进程:sqlservr.exe,一个windows下能安装多个实例,多个实例会有多个sqlservr.exe进程. 一个SQL实例在后台对应一个服务,如果多个应用程序放在一个实例里,如果某个应用开发的程序有问题,比如死循环,会导致服务停止,从而导致所有数据库无法工作.可以采用多实例分开方式. 一个服务器上可以装多个实例,标准版(16个)和企业版(50个)支持的实例数量不同. SQL server实例的类型 (一)默认实例和命名实例 1.服务