java网络编程——多线程数据收发并行

基本介绍与思路

收发并行

前一篇博客中,完成了客户端与服务端的简单TCP交互,但这种交互是触发式的:客户端发送一条消息,服务端收到后再回送一条。没有做到收发并行。收发并行的字面意思很容易理解,即数据的发送与接收互相不干扰,相互独立。当然,要保证服务端和客户端都能做到收发并行。

业务逻辑

脱离业务逻辑的实践是毫无意义的,先描述一下本实践中的业务逻辑:一个服务端接受多个客户端的连接,连接后,向各个客户端定时发送时间戳数据,同时在并行条件下,接受各个客户端发送来的数据并显示;客户端键盘输入字符串,发送给服务端,同时在并行条件下,接收服务器发来的时间戳数据并显示。

实现思路

实现发送与接收并行,思路其实非常直观,即建立两个线程,分别用来实现输入流和输出流。我的代码的设计方案如下图所示:

  • 服务端:创建一个监听客户端连接的线程,线程中一旦接收到请求,创建一个对应该客户端收发处理的对象,对象中创建输入流线程,并使用单例线程池创建输出流线程。主线程使用键盘输入流System.in来进行阻塞。同时主线程中创建Timer定时器,定时向输出流发送数据。
  • 客户端:主线程发送连接请求,与服务器建立连接。使用键盘输入流System.in来阻塞主线程,同时作为输出流使用;创建一个输入流线程,异步运行,接收服务器数据。

代码分析

源代码文件结构如下图所示

服务端

服务器端分为三个部分,分别是Server.java,TCPServer.java和ClientHandler.java

Server.java
package Server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.TimerTask;
import java.util.Timer;
import java.util.Date;

public class Server {
    private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss");
    public static void main(String[] args){
        try {
            TCPServer.accept();
            new Timer("Timer").schedule(new TimerTask() {
                @Override
                public void run() {
                    TCPServer.broadcast(df.format(new Date()));
                }
            }, 1000,5000);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            String str;
            //因为ClientListen是异步线程,使用键盘输入流将主线程阻塞住,保证跟ClientListen线程同步,同时可控制ClientListen服务的退出
            do{
                str = bufferedReader.readLine();
            }while (str.equalsIgnoreCase("serverExit"));
        }catch (Exception e){
            System.out.println("监听请求过程中异常退出");
        }

        try {
            TCPServer.stop();
        } catch (IOException e) {
            System.out.println("关闭套接字过程中出现异常");
        } finally {
            System.out.println("服务器端套接字已关闭!");
        }
    }

}
TCPServer.java
package Server;

import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.UUID;

class TCPServer {
    private static int LOCAL_PORT = 3001;
    private static ClientListenHandle clientListenHandle;
    private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>();

    static void accept() throws IOException {
        //创建服务器端套接字
        ServerSocket serverSocket = createSocket();
        InitSocket(serverSocket);
        System.out.println("服务器准备就绪 addr: " + Inet4Address.getLocalHost() + "  /port: " + LOCAL_PORT);
        System.out.println("开始监听客户端连接...");
        //创建线程监听客户端请求
        clientListenHandle = new ClientListenHandle(serverSocket);
        clientListenHandle.start();

    }

    static void stop() throws IOException {
        for (ClientHandler clientHandler : clientHandlerList) {
            clientHandler.socketClose();
        }
        clientHandlerList.clear();
        clientListenHandle.exit();
    }

    private static ServerSocket createSocket() throws IOException {
        ServerSocket socket = new ServerSocket(LOCAL_PORT, 50);
        return socket;
    }

    private static void InitSocket(ServerSocket socket) throws SocketException {
        // 是否复用未完全关闭的地址端口
        socket.setReuseAddress(true);

        // 等效Socket#setReceiveBufferSize
        socket.setReceiveBufferSize(64 * 1024 * 1024);

        // 设置serverSocket#accept超时时间,不设置即永久等待
        // serverSocket.setSoTimeout(2000);

        // 设置性能参数:短链接,延迟,带宽的相对重要性
        socket.setPerformancePreferences(1, 1, 1);
    }

    static void broadcast(String msg) {
        for (ClientHandler clientHandler : clientHandlerList) {
            clientHandler.write(msg);
        }

    }
    /**
     * 监听客户端请求的线程
     */
    static class ClientListenHandle extends Thread {
        private final ServerSocket serverSocket;
        private Boolean done = false;

        ClientListenHandle(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        @Override
        public void run() {
            super.run();
            try {
                do {
                    Socket client;
                    try {
                        client = serverSocket.accept();
                    } catch (Exception e) {
                        continue;//某一个客户端连接失败,要保证其它客户端能正常连接
                    }
                    String uuid = UUID.randomUUID().toString();//为客户端生成唯一标识
                    System.out.println("已接受连接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort());
                    //为该客户端实例化一个ClientHandler对象,注入对象删除操作的lambda表达式
                    ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid);
                    clientHandle.read();
                    clientHandlerList.add(clientHandle);
                } while (!done);
            } catch (Exception e) {
                if (!done) {
                    System.out.println("异常退出!");
                }
            }
        }

        void exit() throws IOException {
            done = true;
            serverSocket.close();
        }
    }
}
ClientHandler.java
package Server;

import java.io.*;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ClientHandler {
    private final Socket client;
    private final ReadHandler readHandler;
    private final WriteHandle writeHandler;
    private final Removable removable;
    private final String uid;

    ClientHandler(Socket socket, Removable removable, String uid) throws IOException {
        this.client = socket;
        this.readHandler = new ReadHandler(socket.getInputStream());
        this.writeHandler = new WriteHandle(socket.getOutputStream());
        this.removable = removable;
        this.uid = uid;
    }

    void read() {
        readHandler.start();
    }

    void write(String msg) {
        System.out.println("Server -->> " + uid + " : " + msg);
        writeHandler.write(msg);
    }

    /**
     * 把输入输出流和套接字都关闭
     */
    void socketClose(){
        try {
            readHandler.exit();
            writeHandler.exit();
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            System.out.println("客户端:"+uid+" 套接字连接已关闭");
        }
    }
    /**
     * 把自身从对象列表中清除掉,具体方法是使用lambda表达式来注入的
     */
    void removeClientHandler() {
        removable.removeClientHandle(this);
    }

    /**
     * 定义一个接口,接收lambda表达式
     */
    interface Removable {
        void removeClientHandle(ClientHandler clientHandler);
    }

    /**
     * 输入流操作线程
     */
    class ReadHandler extends Thread {
        private final InputStream inputStream;
        private Boolean flag = true;

        ReadHandler(InputStream inputStream) {
            this.inputStream = inputStream;
        }

        @Override
        public void run() {
            super.run();

            BufferedReader socketInput = null;
            try {
                socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {

                    String str = socketInput.readLine();
                    //不知道为什么,客户端关闭时,这里直接报异常,获取不到null
                    if (str.equalsIgnoreCase("exit")) {
                        System.out.println("已无法读取客户端数据!");
                        throw new Exception();
                    }
                    System.out.println(uid + " -->> server : " + str);
                } while (flag);
            } catch (Exception e) {
                if (flag) {
                    System.out.println("读取客户端过程中异常退出");
                    ClientHandler.this.removeClientHandler();
                    ClientHandler.this.socketClose();
                }
            }
        }

        void exit() throws IOException {
            flag = false;
            inputStream.close();
        }
    }

    /**
     * 输出流操作线程,使用单例线程池,可以自动等待任务并处理,无需人工添加阻塞操作
     */
    class WriteHandle {
        private final OutputStream outputStream;
        private final ExecutorService executorService;

        WriteHandle(OutputStream outputStream) {
            this.outputStream = outputStream;
            this.executorService = Executors.newSingleThreadExecutor();
        }
        private void write(String msg){
            executorService.execute(new WriteRunnable(msg,outputStream));
        }
        void exit() throws IOException{
            outputStream.close();
            executorService.shutdown();
        }
        class WriteRunnable implements Runnable{
            private final String msg;
            private final PrintStream printStream;

            WriteRunnable(String msg, OutputStream outputStream) {
                this.msg = msg;
                this.printStream = new PrintStream(outputStream);
            }

            @Override
            public void run() {
                try {
                    printStream.println(msg);
                } catch (Exception e) {
                    System.out.println("打印输出异常!");
                }

            }
        }
    }
}

客户端

Client.java
package Client;

import java.io.*;
import java.util.UUID;

import Client.bean.ServerInfo;
public class Client {
    public static void main(String[] args)throws IOException {
        ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001);
        System.out.println("准备发起服务器连接...");
        System.out.println("服务器信息:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort());

        try {
            TCPClient.connect(serverInfo);
        }catch (Exception e){
            System.out.println("连接失败,退出");
        }
    }
}
TCPClient.java
package Client;

import Client.bean.ServerInfo;

import java.io.*;
import java.net.*;

class TCPClient {
    static void connect(ServerInfo serverInfo) throws IOException {
        Socket clientSocket = createSocket();//建立套接字

        InitSocket(clientSocket);//初始化套接字
        //连接远程服务器
        clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000);
        System.out.println("已连接server");
        try {
            //输入流线程
            ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream());
            readHandle.start();

            //输出流
            write(clientSocket);
            //当输出流结束时,关闭输入流
            readHandle.exit();
        } catch (Exception e) {
            System.out.println("出现异常!");
        } finally {
            clientSocket.close();
            System.out.println("客户端结束");
        }
    }

    private static Socket createSocket() throws IOException {
        Socket socket = new Socket();
        return socket;
    }

    private static void InitSocket(Socket socket) throws SocketException {
        // 设置读取超时时间为2秒,超过2秒未获得数据时readline报超时异常;不设置即进行永久等待
        //socket.setSoTimeout(2000);
        // 是否复用未完全关闭的Socket地址,对于指定bind操作后的套接字有效
        socket.setReuseAddress(true);

        // 是否开启Nagle算法
        socket.setTcpNoDelay(true);

        // 是否需要在长时无数据响应时发送确认数据(类似心跳包),时间大约为2小时
        socket.setKeepAlive(true);

        // 对于close关闭操作行为进行怎样的处理;默认为false,0
        // false、0:默认情况,关闭时立即返回,底层系统接管输出流,将缓冲区内的数据发送完成
        // true、0:关闭时立即返回,缓冲区数据抛弃,直接发送RST结束命令到对方,并无需经过2MSL等待
        // true、200:关闭时最长阻塞200毫秒,随后按第二情况处理
        socket.setSoLinger(true, 20);

        // 是否让紧急数据内敛,默认false;紧急数据通过 socket.sendUrgentData(1);发送
        socket.setOOBInline(true);

        // 设置接收发送缓冲器大小
        socket.setReceiveBufferSize(64 * 1024 * 1024);
        socket.setSendBufferSize(64 * 1024 * 1024);

        // 设置性能参数:短链接,延迟,带宽的相对重要性
        socket.setPerformancePreferences(1, 1, 1);
    }

    /**
     * 输出流方法
     */
    private static void write(Socket socket) throws IOException {
        //构建键盘输入流
        InputStream in = System.in;
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
        //得到socket输出流并转化为打印流
        OutputStream outputStream = socket.getOutputStream();
        PrintStream printStream = new PrintStream(outputStream);

        for(;;){
            String str = bufferedReader.readLine();//从键盘输入获取内容
            printStream.println(str);//通过打印流输出
            if(str.equalsIgnoreCase("exit")){
                break;
            }
        }

        printStream.close();
        System.out.println("输出流关闭");
    }

    /**
     * 输入流线程
     */
    static class ReadHandle extends Thread {
        private final InputStream inputStream;
        private Boolean done = false;

        ReadHandle(InputStream inputStream) {
            this.inputStream = inputStream;
        }

        @Override
        public void run() {
            super.run();
            try {
                //获取输入流
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    String str;
                    str = socketInput.readLine();
                    if (str==null) {
                        break;
                    }
                    System.out.println("From server: "+ str);
                } while (!done);
            } catch (Exception e) {
                if (!done) {
                    System.out.println("异常断开,或者输入异常");
                }
            }
        }

        void exit() {
            done = true;
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                System.out.println("输入流关闭");
            }
        }
    }
}

关于代码的具体分析,由于代码已有很多注释,博文中便不再赘述。

运行结果

运行结果如下所示

  • 服务端

    连接成功后,服务端每隔5秒向各个客户端发送时间戳信息,同时接收两个客户端发来的信息
  • 客户端1

    输入“I am client1”并向服务端发送,同时接收服务端发来的时间戳信息
  • 客户端2

    输入“I am client2”并向服务端发送,同时接收服务端发来的时间戳信息

本篇博客记录一次实践学习,使用多线程+socket编程,实现了单服务器与多客户端之间的数据收发并行,除此之外,通过思维流程图,整理了代码的设计思路并展示出来。

原文地址:https://www.cnblogs.com/buptleida/p/12514450.html

时间: 2024-10-19 06:32:15

java网络编程——多线程数据收发并行的相关文章

Java网络编程(tcp在服务器上应用多线程)

package org.tcp; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; public class EchoThread implements Runnable { private Socket client = null; public EchoThread(Socket client){ this.c

java网络编程serversocket

转载:http://www.blogjava.net/landon/archive/2013/07/24/401911.html Java网络编程精解笔记3:ServerSocket详解ServerSocket用法详解 1.C/S模式中,Server需要创建特定端口的ServerSocket.->其负责接收client连接请求. 2.线程池->包括一个工作队列和若干工作线程->工作线程不断的从工作队列中取出任务并执行.-->java.util.concurrent->线程池

java网络编程socket\server\TCP笔记(转)

java网络编程socket\server\TCP笔记(转) 2012-12-14 08:30:04|  分类: Socket |  标签:java  |举报|字号 订阅 1 TCP的开销 a  连接协商三次握手,c->syn->s,s->syn ack->c, c->ack->s b  关闭协商四次握手,c->fin->s, s->ack-c,s->fin->c,c->ack->s c  保持数据有序,响应确认等计算开销 d

Java网络编程基础(六)— 基于TCP的NIO简单聊天系统

在Java网络编程基础(四)中提到了基于Socket的TCP/IP简单聊天系统实现了一个多客户端之间护法消息的简单聊天系统.其服务端采用了多线程来处理多个客户端的消息发送,并转发给目的用户.但是由于它是基于Socket的,因此是阻塞的. 本节我们将通过SocketChannel和ServerSocketChannel来实现同样的功能. 1.客户端输入消息的格式 username:msg    username表示要发送的的用户名,msg为发送内容,以冒号分割 2.实现思路 实现思路与Java网络

java网络编程socket解析

转载:http://www.blogjava.net/landon/archive/2013/07/02/401137.html Java网络编程精解笔记2:Socket详解 Socket用法详解 在C/S通信模式中,client需要主动创建于server连接的Socket(套接字).服务器端收到了客户端的连接请求,也会创建与客户连接的Socket.Socket可看做是通信两端的收发器.server与client都通过Socket来收发数据. 1.构造Socket 1.Socket() 2.So

Java网络编程(一)

Java网络编程: 1.1: 网络编程:对于我这个“研究不深”的网络菜鸟来说,我觉得网络编程就是实现计算机与计算机之间通信的编程.写些能够实现计算机与计算机之间的通信就行了(目前来说). 1.2:一台计算机跟另外计算机通讯. 计算机与计算机通讯的三大要素: 1:ip地址---电脑 1.1:作用:唯一标识一台计算机. 回环地址:127.0.0.1==主机:localhost 主机地址作用:测试网卡是否正常. 2:找到相应的应用程序----端口号 端口号-----具有网络功能的应用程序的标识号,没有

【Java】Java网络编程菜鸟进阶:TCP和套接字入门

Java网络编程菜鸟进阶:TCP和套接字入门 JDK 提供了对 TCP(Transmission Control Protocol,传输控制协议)和 UDP(User Datagram Protocol,用户数据报协议)这两个数据传输协议的支持.本文开始探讨 TCP. TCP 基础知识 在“服务器-客户端”这种架构中,服务器和客户端各自维护一个端点,两个端点需要通过网络进行数据交换.TCP 为这种需求提供了一种可靠的流式连接,流式的意思是传出和收到的数据都是连续的字节,没有对数据量进行大小限制.

java 网络编程复习(转)

好久没有看过Java网络编程了,现在刚好公司有机会接触,顺便的拾起以前的东西 参照原博客:http://www.cnblogs.com/linzheng/archive/2011/01/23/1942328.html 一.网络编程  通过使用套接字来达到进程间通信目的的编程就是网络编程. 二.网络编程中常见的问题 一个是如何准确的定位网络上一台或多台主机,另一个就是找到主机后如何可靠高效的进行数据传输. 在TCP/IP协议中IP层主要负责网络主机的定位,数据传输的路由,由IP地址可以唯一地确定I

关于Java网络编程

一,网络编程中两个主要的问题 一个是如何准确的定位网络上一台或多台主机,另一个就是找到主机后如何可靠高效的进行数据传输. 在TCP/IP协议中IP层主要负责网络主机的定位,数据传输的路由,由IP地址可以唯一地确定Internet上的一台主机. 而TCP层则提供面向应用的可靠(tcp)的或非可靠(UDP)的数据传输机制,这是网络编程的主要对象,一般不需要关心IP层是如何处理数据的. 目前较为流行的网络编程模型是客户机/服务器(C/S)结构.即通信双方一方作为服务器等待客户提出请求并予以响应.客户则