Java NIO 读数据处理过程

这两天仿hadoop 写java RPC框架,使用PB作为序列号工具,在写读数据的时候遇到一个小坑。之前写过NIO代码,恰好是错误的代码产生正确的逻辑,误以为自己写对了。现在简单整理一下。

使用NIO,select()到读事件时,要处理4种情况:

1. channel还有数据,继续读。

2. channel中暂时没数据,但channel还没断开,这是读取到的数据个数为0,结束读,继续到select()处阻塞等待数据。

3. 另一端channel.close()关闭连接,这时候读channel返回的读取数是-1,表示已经到末尾,跟读文件到末尾时是一样的。既然已经结束了,就把对应的SelectionKey给cancel掉,表示selector不再监听这个channel上的读事件。并且关闭连接,本端channel.close()。

4. 另一端被强制关闭,也就是channel没有close()就被强制断开了,这时候本端会抛出一个IOException异常,要处理这个异常。

之前对 另一端channel.close()关闭连接 没有细究,不清楚 读channel返回的读取数-1 是什么意思。然后没有cancel对应的SelectionKey,也没关闭连接,结果就是selector.select()一直返回读事件,但是没有数据。

直接贴服务器和客户端代码:

Server:

package socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer2 {

    private void startServer() throws IOException {
        Selector selector = Selector.open();

        {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(9000);
            ss.bind(address);

            System.out.println("ssc 0 : " + ssc);
            System.out.println("ss 0 : " + ss);

            SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("acceptKey: " + acceptKey);
            printKeyInfo(acceptKey);
            System.out.println("Going to listen on 9000");
        }

        while (true) {
            System.out.println("===================================\nstart select...");
            int num = selector.select();
            System.out.println("NIOServer: Number of keys after select operation: " + num);

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectionKeys.iterator();

            while (it.hasNext()) {
                SelectionKey key = it.next();
                System.out.println("key: " + key);
                printKeyInfo(key);

                it.remove();

                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    System.out.println("select ACCEPT");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    System.out.println("ssc 1 : " + ssc);
                    System.out.println("sc 1 : " + sc);

                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                    System.out.println("new key:" + newKey);
                    printKeyInfo(newKey);
                }
                else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//                    System.out.println("select READ");
//                    System.out.print("before cancel:");printKeyInfo(key);
//                    key.cancel();
//                    System.out.println("after cancel:");printKeyInfo(key);
                    SocketChannel sc = (SocketChannel) key.channel();
                    System.out.println("sc 2 : " + sc);

                    //echo data
                    //下面的处理是正确的,count<0则cancel key。count=0则进入下一轮select()阻塞等待数据。
//                    try {
//                        int count = doRead(key);
//                        if (count < 0) {
//                            key.cancel();
//                            System.out.println("cancel key for < 0");
//                            sc.read(ByteBuffer.allocate(2));
//                        }
//                    } catch(IOException e) {
//                        e.printStackTrace();
//                        key.cancel();
//                        System.out.println("cancel key");
//                    }

                    //下面的处理过程是错误的,偶然情况下会出现正确逻辑。在客户端连续写,写完马上关闭连接,这时下面代码能打印出客户端的输出,
                    //客户端关闭连接,下面的代码马上爆出异常,是这行代码。java.io.IOException: 您的主机中的软件中止了一个已建立的连接。
//                    int nbytes = 0;
//                    ByteBuffer echoBuffer = ByteBuffer.allocate(16);
//                    while (true) {
//                        echoBuffer.clear();
//                        int r = sc.read(echoBuffer);
//                        System.out.println(new String(echoBuffer.array()));
//                        if (r <= 0) break;
//                        echoBuffer.flip();
//                        sc.write(echoBuffer);
//                        nbytes += r;
//                    }
//                    System.out.println("echoed " + nbytes + " from " + sc);

                    //下面的是处理过程是正确的。正确的做法就是对读取到n,0,-1分别处理,还要对客户端强制关闭的异常做处理
                    while (true) {
                        ByteBuffer buffer = ByteBuffer.allocate(2);
                        buffer.clear();
                        int r;
                        try {
                            r = sc.read(buffer);
                            System.out.println("r = " + r);
                            System.out.println(new String(buffer.array()));
                            if (r < 0) {
                                //客户端socket.close()会到这里,读取数r=-1
                                key.cancel();
                                System.out.println("cancel key for < 0");
                                break;
                            } else if (r == 0) {
                                //客户端socket没有关闭,而channel没有数据,数据数r=0。
                                //有时候select()返回了,但channel不一定有数据。可能select()是被其他方法唤醒
                                break;
                            }
                        } catch (IOException e) {
                            //客户端强制关闭会来这里报异常
                            e.printStackTrace();
                            key.cancel();
                            System.out.println("cancel key for Exception");
                            break;
                        }
                    }//while
                }// if ... else if
//                try {
//                    Thread.sleep(500);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
            }//while
        }//while
    }

    private int doRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        while (true) {
            int count = -1;
            ByteBuffer buffer = ByteBuffer.allocate(2);
            if (buffer.remaining() > 0) {
                count = channel.read(buffer);
                System.out.println("count = " + count);
                if (count <= 0) return count;
            }
        }
    }

    private static void printKeyInfo(SelectionKey sk) {
        String s = new String();

        s = "Att: " + (sk.attachment() == null ? "no" : "yes");
        s += ", Read: " + sk.isReadable();
        s += ", Acpt: " + sk.isAcceptable();
        s += ", Cnct: " + sk.isConnectable();
        s += ", Wrt: " + sk.isWritable();
        s += ", Valid: " + sk.isValid();
        s += ", interestOps: " + sk.interestOps();
        s += ", readyOps: " + sk.readyOps();
        System.out.println(s);
    }

    public static void main(String[] args) {
        try {
            new NIOServer2().startServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Client:

package socket;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class SocketClient {

    public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
        Socket socket = new Socket("localhost", 9000);
        DataOutputStream out = new DataOutputStream(socket.getOutputStream());
        byte[] bytes = "fdfd".getBytes();
//        System.out.println("send fdfd");
        out.write(bytes);
        out.flush();

//        Thread.sleep(15*1000);

//        System.out.println("send loll");
        out.write("loull".getBytes());
        out.flush();

//        Thread.sleep(1*1000);
        socket.close();
        System.out.println("client socket close");
    }
}

浪费了一些时间,一方面因为自己对网络编程不够熟悉,比如不清楚-1什么意思。另一方面Java NIO的API还是略显难用。

时间: 2025-01-05 04:27:47

Java NIO 读数据处理过程的相关文章

Java NIO中的读和写

一.概述 读和写是I/O的基本过程.从一个通道中读取只需创建一个缓冲区,然后让通道将数据读到这个缓冲区.写入的过程是创建一个缓冲区,用数据填充它,然后让通道用这些数据来执行写入操作. 二.从文件中读取 1.原始I/O读文件 如果使用原来的I/O,那么只需要创建一个FileInputStream并从它那里读取,示例代码如下: public class BioTest { public static void main(String[] args) throws IOException { File

scala文件读取报错“java.nio.charset.MalformedInputException: Input length = 1”

今天写spark程序的时候遇到了一个问题就是,读取文件的时候报了一个错:"Exception in thread "main" java.nio.charset.MalformedInputException: Input length = 1" 读取文件的代码如下: 一看这个这个错"nio"错误,第一感觉就是读文件方法这里出了问题,于是点击去看了一下Source.fromFile这个方法的源码: 果然,这个方法的重载有好几个,但是每一都直接或间

[转载] Java NIO与IO

原文地址:http://tutorials.jenkov.com/java-nio/nio-vs-io.html 作者:Jakob Jenkov   译者:郭蕾    校对:方腾飞 当学习了Java NIO和IO的API后,一个问题马上涌入脑海: 我应该何时使用IO,何时使用NIO呢?在本文中,我会尽量清晰地解析Java NIO和IO的差异.它们的使用场景,以及它们如何影响您的代码设计. Java NIO和IO的主要区别 下表总结了Java NIO和IO之间的主要差别,我会更详细地描述表中每部分

Java NIO 系列教程(转)

原文中说了最重要的3个概念,Channel 通道Buffer 缓冲区Selector 选择器其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西.以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上.nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各

Java NIO 系列教程

转载于http://www.iteye.com/magazines/132-Java-NIO Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API.本系列教程将有助于你学习和理解Java NIO.感谢并发编程网的翻译和投递. (关注ITeye官微,随时随地查看最新开发资讯.技术文章.) Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流

转:Java NIO

Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API.本系列教程将有助于你学习和理解Java NIO.感谢并发编程网的翻译和投递. (关注ITeye官微,随时随地查看最新开发资讯.技术文章.) Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通

Java NIO与IO

当学习了Java NIO和IO的API后,一个问题立即涌入脑海: 我应该何时使用IO,何时使用NIO呢?在本文中,我会尽量清晰地解析Java NIO和IO的差异.它们的使用场景,以及它们怎样影响您的代码设计. Java NIO和IO的主要差别 下表总结了Java NIO和IO之间的主要区别,我会更具体地描写叙述表中每部分的差异. IO                NIO 面向流            面向缓冲 堵塞IO           非堵塞IO 无 选择器 面向流与面向缓冲 Java N

Java NIO系列教程(十二) Java NIO与IO

当学习了Java NIO和IO的API后,一个问题马上涌入脑海: 我应该何时使用IO,何时使用NIO呢?在本文中,我会尽量清晰地解析Java NIO和IO的差异.它们的使用场景,以及它们如何影响您的代码设计. Java NIO和IO的主要区别 下表总结了Java NIO和IO之间的主要差别,我会更详细地描述表中每部分的差异. IO                NIO 面向流            面向缓冲 阻塞IO           非阻塞IO 无 选择器 面向流与面向缓冲 Java NIO

java nio 与io区别

转自:http://blog.csdn.net/keda8997110/article/details/19549493 当学习了Java NIO和IO的API后,一个问题马上涌入脑海: 我应该何时使用IO,何时使用NIO呢?在本文中,我会尽量清晰地解析Java NIO和IO的差异.它们的使用场景,以及它们如何影响您的代码设计. Java NIO和IO的主要区别 下表总结了Java NIO和IO之间的主要差别,我会更详细地描述表中每部分的差异. IO                NIO 面向流