java异步通信

在Merlin之前,编写Socket程序是比较繁琐的工作.因为输入输出都必须同步.这样,对于多客户端客户/服务器模式,不得不使用多线程.即为每个连接的客户都分配一个线程来处理输入输出.由此而带来的问题是可想而知的.程序员不得不为了避免死锁,线程安全等问题,进行大量的编码和测试.很多人都在抱怨为什么不在Java中引入异步输入输出机制.比较官方的解释是,任何一种应用程序接口的引入,都必须兼容任何操作平台.因为Java是跨平台的.而当时支持异步输入输出机制的操作平台显然不可能是全部.自Java 2 Platform以后,分离出J2SE,J2ME,J2EE三种不同类型的应用程序接口,以适应不同的应用开发.Java标准的制订者们意识到了这个问题,并且支持异步输入输出机制的操作平台在当今操作平台中处于主流地位.于是,Jdk(J2SE) 的第五次发布中引入了异步输入输出机制.

以前的Socket进程通信程序设计中,一般客户端和服务器端程序设计如下:

1.服务器端:

//服务器端监听线程

while (true) {

  .............

  Socket clientSocket;

  clientSocket = socket.accept(); //取得客户请求Socket,如果没有//客户请求连接,线程在此处阻塞

  //用取得的Socket构造输入输出流

  PrintStream os = new PrintStream(new  BufferedOutputStream(clientSocket.getOutputStream(),

    1024), false);

  BufferedReader is = new BufferedReader(new  InputStreamReader(clientSocket.getInputStream()));

  //创建客户会话线程,进行输入输出控制,为同步机制  new ClientSession();

  .......

}

1.客户端:

............

clientSocket = new Socket(HOSTNAME, LISTENPORT);//连接服务器套接字

//用取得的Socket构造输入输出流

PrintStream os = new PrintStream(new  BufferedOutputStream(clientSocket.getOutputStream(),  1024), false);

BufferedReader is = new BufferedReader(new  InputStreamReader(clientSocket.getInputStream()));  //进行输入输出控制  .......

1.以上代码段只是用同步机制编写Socket进程通信的一个框架,实际上要考虑的问题要复杂的多(有兴趣的读者可以参考我的一篇文章《Internet 实时通信系统设计与实现》)。将这样一个框架列出来,只是为了与用异步机制实现的Socket进程通信进行比较。下面将介绍使用异步机制的程序设计。

用异步输入输出流编写Socket进程通信程序

在Merlin中加入了用于实现异步输入输出机制的应用程序接口包:

java.nio(新的输入输出包,定义了很多基本类型缓冲(Buffer)),

java.nio.channels(通道及选择器等,用于异步输入输出),

java.nio.charset(字符的编码解码)。通道 (Channel)首先在选择器(Selector)中注册自己感兴趣的事件,当相应的事件发生时,选择器便通过选择键(SelectionKey)通知已注册的通道。然后通道将需要处理的信息,通过缓冲(Buffer)打包,编码/解码,完成输入输出控制。

通道介绍:

这里主要介绍ServerSocketChannel和 SocketChannel.它们都是可选择的(selectable)通道,分别可以工作在同步和异步两种方式下

(注意,这里的可选择不是指可以选择两种工作方式,而是指可以有选择的注册自己感兴趣的事件)。可以用channel.configureBlocking(Boolean )来设置其工作方式。与以前版本的API相比较,ServerSocketChannel就相当于ServerSocket (ServerSocketChannel封装了ServerSocket),而SocketChannel就相当于Socket (SocketChannel封装了Socket)。当通道工作在同步方式时,编程方法与以前的基本相似,这里主要介绍异步工作方式。

所谓异步输入输出机制,是指在进行输入输出处理时,不必等到输入输出处理完毕才返回。所以异步的同义语是非阻塞(None Blocking)。在服务器端,ServerSocketChannel通过静态函数open()返回一个实例serverChl。然后该通道调用 serverChl.socket().bind()绑定到服务器某端口,并调用register(Selector sel, SelectionKey.OP_ACCEPT)注册OP_ACCEPT事件到一个选择器中(ServerSocketChannel只可以注册 OP_ACCEPT事件)。当有客户请求连接时,选择器就会通知该通道有客户连接请求,就可以进行相应的输入输出控制了;在客户端,clientChl实例注册自己感兴趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的组合),调用clientChl.connect (InetSocketAddress )连接服务器然后进行相应处理。注意,这里的连接是异步的,即会立即返回而继续执行后面的代码。

选择器和选择键介绍:

选择器(Selector)的作用是:将通道感兴趣的事件放入队列中,而不是马上提交给应用程序,等已注册的通道自己来请求处理这些事件。换句话说,就是选择器将会随时报告已经准备好了的通道,而且是按照先进先出的顺序。那么,选择器是通过什么来报告的呢?选择键(SelectionKey)。选择键的作用就是表明哪个通道已经做好了准备,准备干什么。你也许马上会想到,那一定是已注册的通道感兴趣的事件。不错,例如对于服务器端serverChl来说,可以调用key.isAcceptable()来通知serverChl有客户端连接请求。相应的函数还有: SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一个循环中轮询感兴趣的事件(具体可参照下面的代码)。如果选择器中尚无通道已注册事件发生,调用Selector.select()将阻塞,直到有事件发生为止。另外,可以调用 selectNow()或者select(long timeout)。前者立即返回,没有事件时返回0值;后者等待timeout时间后返回。一个选择器最多可以同时被63个通道一起注册使用。

应用实例:

下面是用异步输入输出机制实现的客户/服务器实例程序?D?D程序清单1(限于篇幅,只给出了服务器端实现,读者可以参照着实现客户端代码):

程序类图  程序清单1

public class NBlockingServer {

  int port = 8000;

  int BUFFERSIZE = 1024;

  Selector selector = null;

  ServerSocketChannel serverChannel = null;

  HashMap clientChannelMap = null;//用来存放每一个客户连接对应的套接字和通道

  public NBlockingServer( int port ) {

    this.clientChannelMap = new HashMap();

    this.port = port;

  }

  public void initialize() throws IOException {

    //初始化,分别实例化一个选择器,一个服务器端可选择通道

    this.selector = Selector.open();

    this.serverChannel = ServerSocketChannel.open();

    this.serverChannel.configureBlocking(false);

    InetAddress localhost = InetAddress.getLocalHost();

    InetSocketAddress isa = new InetSocketAddress(localhost, this.port );

    this.serverChannel.socket().bind(isa);//将该套接字绑定到服务器某一可用端口

  }

  //结束时释放资源

  public void finalize() throws IOException {

  this.serverChannel.close();

  this.selector.close();

  }

  //将读入字节缓冲的信息解码

  public String decode( ByteBuffer byteBuffer ) throws  CharacterCodingException {

  Charset charset = Charset.forName( "ISO-8859-1" );

  CharsetDecoder decoder = charset.newDecoder();

  CharBuffer charBuffer = decoder.decode( byteBuffer );

  String result = charBuffer.toString();  return result;

  }

  //监听端口,当通道准备好时进行相应操作

  public void portListening() throws IOException, InterruptedException {

  //服务器端通道注册OP_ACCEPT事件

  SelectionKey acceptKey =this.serverChannel.register( this.selector,  SelectionKey.OP_ACCEPT );

  //当有已注册的事件发生时,select()返回值将大于0

  while (acceptKey.selector().select() > 0 ) {

    System.out.println("event happened");

    //取得所有已经准备好的所有选择键

    Set readyKeys = this.selector.selectedKeys();

    //使用迭代器对选择键进行轮询

    Iterator i = readyKeys.iterator();

    while (i.hasNext()) {

      SelectionKey key = (SelectionKey)i.next();

      i.remove();//删除当前将要处理的选择键

      if ( key.isAcceptable() ) {//如果是有客户端连接请求

      System.out.println("more client connect in!");

      ServerSocketChannel nextReady =  (ServerSocketChannel)key.channel();

      //获取客户端套接字

      Socket s = nextReady.accept();

        //设置对应的通道为异步方式并注册感兴趣事件

      s.getChannel().configureBlocking( false );

      SelectionKey readWriteKey =  s.getChannel().register( this.selector,  SelectionKey.OP_READ|SelectionKey.OP_WRITE );

      //将注册的事件与该套接字联系起来

      readWriteKey.attach( s );

      //将当前建立连接的客户端套接字及对应的通道存放在哈希表//clientChannelMap中

      this.clientChannelMap.put( s, new  ClientChInstance( s.getChannel() ) );

    }  else if ( key.isReadable() ) {//如果是通道读准备好事件

    System.out.println("Readable");

    //取得选择键对应的通道和套接字

    SelectableChannel nextReady =  (SelectableChannel) key.channel();

    Socket socket = (Socket) key.attachment();

    //处理该事件,处理方法已封装在类ClientChInstance中

    this.readFromChannel( socket.getChannel(),  (ClientChInstance)  this.clientChannelMap.get( socket ) );  }

    else if ( key.isWritable() ) {//如果是通道写准备好事件

    System.out.println("writeable");

    //取得套接字后处理,方法同上

    Socket socket = (Socket) key.attachment();

    SocketChannel channel = (SocketChannel)  socket.getChannel();

    this.writeToChannel( channel,"This is from server!");

  }

}

}

}

//对通道的写操作

public void writeToChannel( SocketChannel channel, String message )

  throws IOException {

  ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );

  int nbytes = channel.write( buf );

}

//对通道的读操作

public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )  throws IOException, InterruptedException {

  ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );

  int nbytes = channel.read( byteBuffer );

}

时间: 2024-10-22 10:21:21

java异步通信的相关文章

JAVA NIO 类库的异步通信框架netty和mina

Netty 和 Mina 我究竟该选择哪个? 根据我的经验,无论选择哪个,都是个正确的选择.两者各有千秋,Netty 在内存管理方面更胜一筹,综合性能也更优.但是,API 变更的管理和兼容性做的不是太好.相比于 Netty,Mina 的前向兼容性.内聚的可维护性功能更多,例如 JMX 的集成.性能统计.状态机等. Netty 是业界最流行的 NIO 框架之一,它的健壮性.功能.性能.可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如 Hadoop 的 RPC 框

c#与JAVA利用SOCKET实现异步通信的SanNiuSignal.DLL已开源

大家好,前段时间C#的SanNiuSignal.DLL已开源;因部分用户特需要JAVA版的SanNiuSignal;现在只能把半成品先拿出来暂时给他们用了,以后再慢慢改进; JAVA版目前已实现跟C#进行文本双向通信;下面就来详细介绍这种跨语言的通信技术............ 各版本的使用说明和下载地址:1 C#之SanNiuSignal.DLL具体地址:http://www.cnblogs.com/SanNiuSignal/p/4179068.html  2:JAVA版因不完善只能加入QQ群

Java基础知识强化之网络编程笔记24:Android网络通信之 AndroidAsync(基于nio的异步通信库)

1. AndroidAsync   AndroidAsync 是一个基于nio的异步socket ,http(客户端服务器端),websocket,socket.io库,AndroidAsync 是一个底层的网络协议库,如果你想要一个容易使用,高级的,http请求库,请使用Ion(它是基于AndroidAsync 的),正常来说开发者更倾向于使用  Ion. 如果你需要一个未被封装的Android的raw Socket, HTTP client/server, WebSocket, and So

Java面试18|关于进程、线程与协程

1.IPC(Inter-Process Communication,进程间通信)与线程通信的几种方式 # 管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用.进程的亲缘关系通常是指父子进程关系.# 有名管道 (named pipe) : 有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信.# 信号量( semophore ) : 信号量是一个计数器,可以用来控制多个进程对共享资源的访问.它常作为一种锁机制,防止某进程正在访问共享资源

序列化之protobuf与avro对比(Java)

最近在做socket通信中用到了关于序列化工具选型的问题,在调研过程中开始趋向于用protobuf,可以省去了编解码的过程.能够实现快速开发,且只需要维护一份协议文件即可. 但是调研过程中发现了protobuf的一些弊端,比如需要生成相应的文件类,和业务绑定太紧密,所以在看了AVRO之后发现它完美解决了这个问题. 下面记录下对这两种序列化工具的入门与测评. 一.protobuf基本操作 protobuf简介: Protocol Buffers (a.k.a., protobuf) are Goo

JAVA基础知识之网络编程——-基于AIO的异步Socket通信

异步IO 下面摘子李刚的<疯狂JAVA讲义> 按照POSIX标准来划分IO,分为同步IO和异步IO.对于IO操作分为两步,1)程序发出IO请求. 2)完成实际的IO操作. 阻塞IO和非阻塞IO都是针对第一步来划分的,如果发出IO请求会阻塞线程,就是阻塞IO,否则就是非阻塞IO. 同步IO和非同步IO是针对第二步来说的,如果实际IO操作是由操作系统完成,再返回给程序,就是异步IO. 如果实际的IO需要程序本身去执行,会阻塞线程,就是同步IO. JAVA7的NIO.2提供了异步的channel,

转Java 回调函数的理解

所谓回调,就是客户程序C调用服务程序S中的某个函数A,然后S又在某个时候反过来调用C中的某个函数B,对于C来说,这个B便叫做回调函数.例如Win32下的窗口过程函数就是一个典型的回调函数.一般说来,C不会自己调用B,C提供B的目的就是让S来调用它,而且是C不得不提供.由于S并不知道C提供的B姓甚名谁,所以S会约定B的接口规范(函数原型),然后由C提前通过S的一个函数R告诉S自己将要使用B函数,这个过程称为回调函数的注册,R称为注册函数.Web Service以及Java的RMI都用到回调机制,可

android开发之网络棋牌类在线游戏开发心得(服务器端、Java) 好文章值得收藏

标签: android服务器 2013-10-09 17:28 3618人阅读 评论(0) 收藏 举报 分类: android(11) 转自:http://blog.csdn.net/bromon/article/details/253330 Bromon原创 请尊重版权 一个多人在线的棋牌类网络游戏的项目临近尾声,我参与了该项目的整个设计流程,并且完成了90%的核心代码.关于这个项目,有很多地方值得聊一聊.本系列不打算把这个项目将得多么详细规范,那是设计文档应该描述的,我打算只说说一些值得注意

ActiveMq整理之java应用

一.JMS 更多介绍参考 "http://baike.baidu.com/link?url=LNCEOGgqEX-uSKuRJooyG1RSfS7CTWDKYT8OOouhxLk_yWNN-0wNSWq7KjNQ259a9pfL95janJi8v8-drvdHqa" 1.1背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度越来越高的分布式系统,这些技术也显示出其局限性: 1.同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果才