Java入门系列-25-NIO(实现非阻塞网络通信)

还记得之前介绍NIO时对比传统IO的一大特点吗?就是NIO是非阻塞式的,这篇文章带大家来看一下非阻塞的网络操作。

补充:以数组的形式使用缓冲区

package testnio;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class TestBufferArray {

    public static void main(String[] args) throws IOException {
        RandomAccessFile raf1=new RandomAccessFile("D:/1.txt","rw");

        //1.获取通道
        FileChannel channel1=raf1.getChannel();

        //2.创建缓冲区数组
        ByteBuffer buf1=ByteBuffer.allocate(512);
        ByteBuffer buf2=ByteBuffer.allocate(512);
        ByteBuffer[] bufs= {buf1,buf2};
        //3.将数据读入缓冲区数组
        channel1.read(bufs);

        for (ByteBuffer byteBuffer : bufs) {
            byteBuffer.flip();
        }
        System.out.println(new String(bufs[0].array(),0,bufs[0].limit()));
        System.out.println("-----------");
        System.out.println(new String(bufs[1].array(),0,bufs[1].limit()));

        //写入缓冲区数组到通道中
        RandomAccessFile raf2=new RandomAccessFile("D:/2.txt","rw");
        FileChannel channel2=raf2.getChannel();
        channel2.write(bufs);

    }
}

使用NIO实现阻塞式网络通信

TCP协议的网络通信传统实现方式是通过套接字编程(Socket和ServerSocket),NIO实现TCP网络通信需要用到 Channel 接口的两个实现类:SocketChannel和ServerSocketChannel

使用NIO实现阻塞式网络通信

客户端

package com.jikedaquan.blockingnio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class Client {

    public static void main(String[] args) {

        SocketChannel sChannel=null;

        FileChannel inChannel=null;
        try {
            //1、获取通道
            sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666));
            //用于读取文件
            inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ);

            //2、分配指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);

            //3、读取本地文件,发送到服务器端

            while(inChannel.read(buf)!=-1) {
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            //关闭通道
            if (inChannel!=null) {
                try {
                    inChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            if(sChannel!=null) {
                try {
                    sChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

new InetSocketAddress("127.0.0.1", 1666) 用于向客户端套接字通道(SocketChannel)绑定要连接地址和端口

服务端

package com.jikedaquan.blockingnio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class Server {

    public static void main(String[] args) {

        ServerSocketChannel ssChannel=null;

        FileChannel outChannel=null;

        SocketChannel sChannel=null;
        try {
            //1、获取通道
            ssChannel = ServerSocketChannel.open();
            //用于保存文件的通道
            outChannel = FileChannel.open(Paths.get("F:/b.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE);

            //2、绑定要监听的端口号
            ssChannel.bind(new InetSocketAddress(1666));
            //3、获取客户端连接的通道
            sChannel = ssChannel.accept();

            //4、分配指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);

            //5、接收客户端的数据,并保存到本地
            while(sChannel.read(buf)!=-1) {
                buf.flip();
                outChannel.write(buf);
                buf.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            //6、关闭通道
            if(sChannel!=null) {
                try {
                    sChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(outChannel!=null) {
                try {
                    outChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(ssChannel!=null) {
                try {
                    ssChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

服务端套接字仅绑定要监听的端口即可 ssChannel.bind(new InetSocketAddress(1666));

上面的代码使用NIO实现的网络通信,可能有同学会问,没有看到阻塞效果啊,确实是阻塞式的看不到效果,因为客户端发送一次数据就结束了,服务端也是接收一次数据就结束了。那如果服务端接收完成数据后,再向客户端反馈呢?

能够看到阻塞效果的网络通信

客户端

package com.jikedaquan.blockingnio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class Client {

    public static void main(String[] args) {
        SocketChannel sChannel=null;
        FileChannel inChannel=null;
        try {
            sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666));
            inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ);

            ByteBuffer buf=ByteBuffer.allocate(1024);

            while(inChannel.read(buf)!=-1) {
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }

            //sChannel.shutdownOutput();//去掉注释掉将不会阻塞

            //接收服务器端的反馈
            int len=0;
            while((len=sChannel.read(buf))!=-1) {
                buf.flip();
                System.out.println(new String(buf.array(),0,len));
                buf.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(inChannel!=null) {
                try {
                    inChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(sChannel!=null) {
                try {
                    sChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

服务端

package com.jikedaquan.blockingnio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class Server {

    public static void main(String[] args) {

        ServerSocketChannel ssChannel=null;
        FileChannel outChannel=null;
        SocketChannel sChannel=null;
        try {
            ssChannel = ServerSocketChannel.open();
            outChannel = FileChannel.open(Paths.get("F:/a.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);

            ssChannel.bind(new InetSocketAddress(1666));
            sChannel = ssChannel.accept();
            ByteBuffer buf=ByteBuffer.allocate(1024);

            while(sChannel.read(buf)!=-1) {
                buf.flip();
                outChannel.write(buf);
                buf.clear();
            }

            //发送反馈给客户端
            buf.put("服务端接收数据成功".getBytes());
            buf.flip();
            sChannel.write(buf);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(sChannel!=null) {
                try {
                    sChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(outChannel!=null) {
                try {
                    outChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(ssChannel!=null) {
                try {
                    ssChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

服务端将向客户端发送两次数据

选择器(Selector)

想要实现非阻塞的IO,必须要先弄懂选择器。Selector 抽象类,可通过调用此类的 open 方法创建选择器,该方法将使用系统的默认选择器提供者创建新的选择器。

将通道设置为非阻塞之后,需要将通道注册到选择器中,注册的同时需要指定一个选择键的类型 (SelectionKey)。

选择键(SelectionKey)可以认为是一种标记,标记通道的类型和状态。

SelectionKey的静态字段:

OP_ACCEPT:用于套接字接受操作的操作集位

OP_CONNECT:用于套接字连接操作的操作集位

OP_READ:用于读取操作的操作集位

OP_WRITE:用于写入操作的操作集位

用于检测通道状态的方法:

方法名称 说明
isAcceptable() 测试此键的通道是否已准备好接受新的套接字连接
isConnectable() 测试此键的通道是否已完成其套接字连接操作
isReadable() 测试此键的通道是否已准备好进行读取
isWritable() 测试此键的通道是否已准备好进行写入

将通道注册到选择器:

ssChannel.register(selector, SelectionKey.OP_ACCEPT);

IO操作准备就绪的通道大于0,轮询选择器

while(selector.select()>0) {
    //获取选择键,根据不同的状态做不同的操作
}

实现非阻塞式TCP协议网络通信

非阻塞模式:channel.configureBlocking(false);

客户端

package com.jikedaquan.nonblockingnio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Scanner;

public class Client {

    public static void main(String[] args) {
        SocketChannel sChannel=null;
        try {
            //1、获取通道
            sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",1666));

            //2、切换非阻塞模式
            sChannel.configureBlocking(false);

            //3、分配指定大小的缓冲区
            ByteBuffer buf=ByteBuffer.allocate(1024);
            //4、发送数据给服务端
            Scanner scanner=new Scanner(System.in);
            //循环从控制台录入数据发送给服务端
            while(scanner.hasNext()) {

                String str=scanner.next();
                buf.put((new Date().toString()+"\n"+str).getBytes());
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            //5、关闭通道
            if(sChannel!=null) {
                try {
                    sChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

服务端

package com.jikedaquan.nonblockingnio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Server {

    public static void main(String[] args) throws IOException {

        //1、获取通道
        ServerSocketChannel ssChannel=ServerSocketChannel.open();
        //2、切换非阻塞模式
        ssChannel.configureBlocking(false);
        //3、绑定监听的端口号
        ssChannel.bind(new InetSocketAddress(1666));
        //4、获取选择器
        Selector selector=Selector.open();
        //5、将通道注册到选择器上,并指定“监听接收事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);

        //6、轮询式的获取选择器上已经 “准备就绪”的事件
        while(selector.select()>0) {
            //7、获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
            Iterator<SelectionKey> it=selector.selectedKeys().iterator();
            while(it.hasNext()) {
                //8、获取准备就绪的事件
                SelectionKey sk=it.next();
                //9、判断具体是什么事件准备就绪
                if(sk.isAcceptable()) {
                    //10、若“接收就绪”,获取客户端连接
                    SocketChannel sChannel=ssChannel.accept();
                    //11、切换非阻塞模式
                    sChannel.configureBlocking(false);
                    //12、将该通道注册到选择器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()) {
                    //13、获取当前选择器上“读就绪”状态的通道
                    SocketChannel sChannel=(SocketChannel)sk.channel();
                    //14、读取数据
                    ByteBuffer buf=ByteBuffer.allocate(1024);
                    int len=0;
                    while((len=sChannel.read(buf))>0) {
                        buf.flip();
                        System.out.println(new String(buf.array(),0,len));
                        buf.clear();
                    }
                }
                //15、取消选择键 SelectionKey
                it.remove();
            }

        }
    }
}

服务端接收客户端的操作需要在判断 isAcceptable() 方法内将就绪的套接字通道以读操作注册到 选择器中

在判断 isReadable() 内从通道中获取数据

实现非阻塞式UDP协议网络通信

发送端

package com.jikedaquan.nonblockingnio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Scanner;

public class TestDatagramSend {

    public static void main(String[] args) throws IOException {
        //获取通道
        DatagramChannel dChannel=DatagramChannel.open();
        //非阻塞
        dChannel.configureBlocking(false);
        ByteBuffer buf=ByteBuffer.allocate(1024);
        Scanner scanner=new Scanner(System.in);
        while(scanner.hasNext()) {
            String str=scanner.next();
            buf.put(str.getBytes());
            buf.flip();
            //发送数据到目标地址和端口
            dChannel.send(buf,new InetSocketAddress("127.0.0.1", 1666));
            buf.clear();
        }
        dChannel.close();
    }
}

接收端

package com.jikedaquan.nonblockingnio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;

public class TestDatagramReceive {
    public static void main(String[] args) throws IOException {
        //获取通道
        DatagramChannel dChannel=DatagramChannel.open();
        dChannel.configureBlocking(false);
        //绑定监听端口
        dChannel.bind(new InetSocketAddress(1666));
        //获取选择器
        Selector selector=Selector.open();
        //读操作注册通道
        dChannel.register(selector, SelectionKey.OP_READ);
        while(selector.select()>0) {
            Iterator<SelectionKey> it=selector.selectedKeys().iterator();
            //迭代选择键
            while(it.hasNext()) {
                SelectionKey sk=it.next();
                //通道可读
                if(sk.isReadable()) {
                    ByteBuffer buf=ByteBuffer.allocate(1024);
                    //接收数据存入缓冲区
                    dChannel.receive(buf);
                    buf.flip();
                    System.out.println(new String(buf.array(),0,buf.limit()));
                    buf.clear();
                }
            }

            it.remove();
        }
    }
}

原文地址:https://www.cnblogs.com/AIThink/p/9925961.html

时间: 2024-10-03 20:27:12

Java入门系列-25-NIO(实现非阻塞网络通信)的相关文章

JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

阻塞IO与非阻塞IO 通常情况下的Socket都是阻塞式的, 程序的输入输出都会让当前线程进入阻塞状态, 因此服务器需要为每一个客户端都创建一个线程. 从JAVA1.4开始引入了NIO API, NIO可以实现非阻塞IO, 这样就可以使用一个线程处理所有的客户请求. 基于NIO的非阻塞Socket通信 服务器将用来监听客户端请求的channel注册到selector上,启动一个线程,使用selector的select()获取求情的客户端的channel数量, 当监听到有客户端请求时,就通过Sel

4.NIO的非阻塞式网络通信

/*阻塞 和 非阻塞 是对于 网络通信而言的*/ /*原先IO通信在进行一些读写操作 或者 等待 客户机连接 这种,是阻塞的,必须要等到有数据被处理,当前线程才被释放*/ /*NIO 通信 是将这个阻塞的过程 丢给了选择器,客户端和 服务器端 之间建立的通道,都会注册到 选择器上,然后用选择器 实时监控 我们这些通道上的状况*/ /*当某一个通道上 某一个请求的事件 完全准备就绪时,那么选择器才会将 这个任务 分配到服务器上的一个 或多个线程中*/ /*阻塞 与 非阻塞*/ 传统的IO 流都是

Java NIO实现非阻塞式socket通信

博主知识水平有限,只能提供一个个人的狭隘的理解,如果有新人读到这儿,建议看一下其他教程或者API,如果不明白,再来看一下:如果有dalao读到这儿,希望能指出理解中的问题~谢谢 Java提供了用于网络通信的socket和serversocket包,然而实现方式是阻塞式的,同一时间点上只能进行一个连接,这会带来不好的体验.当然了,我们也可以通过不断创建线程的方式管理连接,但线程多了的话反而会降低效率.于是Java推出了非阻塞式IO--channel.并且channel提供关于网络通信的相关chan

java nio学习三:NIO 的非阻塞式网络通信

一.阻塞和非阻塞 传统的 IO 流都是阻塞式的.也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务.因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降.Java NIO 是非阻塞模式的.当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务.线程通常将非阻塞 IO 的空闲时间用于在其他通道上

《Java并发编程实战》笔记-非阻塞算法

如果在某种算法中,一个线程的失败或挂起不会导致其他线程也失败和挂起,那么这种算法就被称为非阻塞算法.如果在算法的每个步骤中都存在某个线程能够执行下去,那么这种算法也被称为无锁(Lock-Free)算法.如果在算法中仅将CAS用于协调线程之间的操作,并且能正确地实现,那么它既是一种无阻塞算法,又是一种无锁算法. 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性. 非阻塞算法的所有特性:某项工作的完成具有不确定性,必须重新执行.

Java入门系列:实例讲解ArrayList用法

本文通过实例讲解Java中如何使用ArrayList类. Java.util.ArrayList类是一个动态数组类型,也就是说,ArrayList对象既有数组的特征,也有链表的特征.可以随时从链表中添加或删除一个元素.ArrayList实现了List接口. 大家知道,数组是静态的,数组被初始化之后,数组长度就不能再改变了.ArrayList是可以动态改变大小的.那么,什么时候使用Array(数组),什么时候使用ArrayList?答案是:当我们不知道到底有多少个数据元素的时候,就可使用Array

【Java入门系列】static关键字

static方法就是没有this的方法.在static方法内部不能调用非静态方法,反过来是可以的.而且可以在没有创建任何对象的前提下,仅仅通过类本身来调用static方法.这实际上正是static方法的主要用途.<java编程思想> 静态变量 static修饰的变量称作静态成员变量,也称作类变量,类变量的生命周期和类相同,在整个应用执行期间有效. 静态变量和普通变量的区别:静态变量被所有的对象所共享,在内存中只有一个副本,仅当在类初次加载时会被初始化:非静态变量是对象所拥有的,在创建对象的时候

Java入门系列-05-数据类型和类型转换

这篇文章为你搞懂2个问题 java 中有哪些数据类型可以用存储数据? java 中的数据类型是怎么转换的? 在上一篇文章中我们学会了如何使用变量,像这样存储一个整数 int age=10;,可以在开发工具中编写一行这样的代码 int age=10.5; 就会发现开发工具报错了,是因为变量中的数据类型也是不能随便用的. 数据类型 咱们先来看下面一组数据 如果每天花费2小时在交通上 1月=60小时=2.5天, 1年=730小时=30天, 50年=36500小时=1520天=4年 这段数据中可以分为以

Java入门系列-06-运算符

这篇文章为你搞懂2个问题 java 中的常用运算符有哪些?如何使用? 这些运算符的运算优先级是怎样的? 算数运算符 明显是做数学运算的,包括以下符号: + 加法运算 敲一敲: public class DemoAdd { public static void main(String[] args) { int sum=2+3; System.out.println(sum);//输出变量 sum 的值 int a=5; int b=3; int result=a+b;//将变量 a 的值与变量