手动搭建I/O网络通信框架2:BIO编程模型实现群聊

第一章:手动搭建I/O网络通信框架1:Socket和ServerSocket入门实战,实现单聊

  在第一章中运用Socket和ServerSocket简单的实现了网络通信。这一章,利用BIO编程模型进行升级改造,实现群聊聊天室。

  

  如图:当一个客户端请求进来时,接收器会为这个客户端分配一个工作线程,这个工作线程专职处理客户端的操作。在上一章中,服务器接收到客户端请求后就跑去专门服务这个客户端了,所以当其他请求进来时,是处理不到的。

  看到这个图,很容易就会想到线程池,BIO是一个相对简单的模型,实现它的关键之处也在于线程池。

  在上代码之前,先大概说清楚每个类的作用,以免弄混淆。更详细的说明,都写在注释当中。

  服务器端:

  ChatServer:这个类的作用就像图中的Acceptor。它有两个比较关键的全局变量,一个就是存储在线用户信息的Map,一个就是线程池。这个类会监听端口,接收客户端的请求,然后为客户端分配工作线程。还会提供一些常用的工具方法给每个工作线程调用,比如:发送消息、添加在线用户等。

  ChatHandler:这个类就是工作线程的类。在这个项目中,它的工作很简单:把接收到的消息转发给其他客户端,当然还有一些小功能,比如添加\移除在线用户。

  客户端:

  相较于服务器,客户端的改动较小,主要是把等待用户输入信息这个功能分到其他线程做,不然这个功能会一直阻塞主线程,导致无法接收其他客户端的消息。

  ChatClient:客户端启动类,也就是主线程,会通过Socket和服务器连接。也提供了两个工具方法:发送消息和接收消息。

  UserInputHandler:专门负责等待用户输入信息的线程,一旦有信息键入,就马上发送给服务器。

  首先创建两个包区分一下客户端和服务器,client和server

  服务器端ChatServer:

public class ChatServer {
    private int DEFAULT_PORT = 8888;
    /**
     * 创建一个Map存储在线用户的信息。这个map可以统计在线用户、针对这些用户可以转发其他用户发送的消息
     * 因为会有多个线程操作这个map,所以为了安全起见用ConcurrentHashMap
     * 在这里key就是客户端的端口号,但在实际中肯定不会用端口号区分用户,如果是web的话一般用session。
     * value是IO的Writer,用以存储客户端发送的消息
     */
    private Map<Integer, Writer> map=new ConcurrentHashMap<>();
    /**
     * 创建线程池,线程上限为10个,如果第11个客户端请求进来,服务器会接收但是不会去分配线程处理它。
     * 前10个客户端的聊天记录,它看不见。当有一个客户端下线时,这第11个客户端就会被分配线程,服务器显示在线
     * 大家可以把10再设置小一点,测试看看
     * */
    private ExecutorService executorService= Executors.newFixedThreadPool(10);
    //客户端连接时往map添加客户端
    public void addClient(Socket socket) throws IOException {
        if (socket != null) {
            BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            map.put(socket.getPort(), writer);
            System.out.println("Client["+socket.getPort()+"]:Online");
        }
    }

    //断开连接时map里移除客户端
    public void removeClient(Socket socket) throws Exception {
        if (socket != null) {
            if (map.containsKey(socket.getPort())) {
                map.get(socket.getPort()).close();
                map.remove(socket.getPort());
            }
            System.out.println("Client[" + socket.getPort() + "]Offline");
        }
    }

    //转发客户端消息,这个方法就是把消息发送给在线的其他的所有客户端
    public void sendMessage(Socket socket, String msg) throws IOException {
        //遍历在线客户端
        for (Integer port : map.keySet()) {
            //发送给在线的其他客户端
            if (port != socket.getPort()) {
                Writer writer = map.get(port);
                writer.write(msg);
                writer.flush();
            }
        }
    }

    //接收客户端请求,并分配Handler去处理请求
    public void start() {
        try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
            System.out.println("Server Start,The Port is:"+DEFAULT_PORT);
            while (true){
                //等待客户端连接
                Socket socket=serverSocket.accept();
                //为客户端分配一个ChatHandler线程
                executorService.execute(new ChatHandler(this,socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ChatServer server=new ChatServer();
        server.start();
    }
}

  服务器端ChatHandler:

public class ChatHandler implements Runnable {
    private ChatServer server;
    private Socket socket;

    //构造函数,ChatServer通过这个分配Handler线程
    public ChatHandler(ChatServer server, Socket socket) {
        this.server = server;
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            //往map里添加这个客户端
            server.addClient(socket);
            //读取这个客户端发送的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            String msg = null;
            while ((msg = reader.readLine()) != null) {
                //这样拼接是为了让其他客户端也能看清是谁发送的消息
                String sendmsg = "Client[" + socket.getPort() + "]:" + msg;
                //服务器打印这个消息
                System.out.println(sendmsg);
                //将收到的消息转发给其他在线客户端
                server.sendMessage(socket, sendmsg + "\n");
                if (msg.equals("quit")) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //如果用户退出或者发生异常,就在map中移除该客户端
            try {
                server.removeClient(socket);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

  客户端ChatClient:

public class ChatClient {
    private BufferedReader reader;
    private BufferedWriter writer;
    private Socket socket;
    //发送消息给服务器
    public void sendToServer(String msg) throws IOException {
        //发送之前,判断socket的输出流是否关闭
        if (!socket.isOutputShutdown()) {
            //如果没有关闭就把用户键入的消息放到writer里面
            writer.write(msg + "\n");
            writer.flush();
        }
    }
    //从服务器接收消息
    public String receive() throws IOException {
        String msg = null;
        //判断socket的输入流是否关闭
        if (!socket.isInputShutdown()) {
            //没有关闭的话就可以通过reader读取服务器发送来的消息。注意:如果没有读取到消息线程会阻塞在这里
            msg = reader.readLine();
        }
        return msg;
    }

    public void start() {
        //和服务创建连接
        try {
            socket = new Socket("127.0.0.1", 8888);
            reader=new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            writer=new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            //新建一个线程去监听用户输入的消息
            new Thread(new UserInputHandler(this)).start();
            /**
             * 不停的读取服务器转发的其他客户端的信息
             * 记录一下之前踩过的小坑:
             * 这里一定要创建一个msg接收信息,如果直接用receive()方法判断和输出receive()的话会造成有的消息不会显示
             * 因为receive()获取时,在返回之前是阻塞的,一旦接收到消息才会返回,也就是while这里是阻塞的,一旦有消息就会进入到while里面
             * 这时候如果输出的是receive(),那么上次获取的信息就会丢失,然后阻塞在System.out.println
             * */
            String msg=null;
            while ((msg=receive())!=null){
                System.out.println(msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
               if(writer!=null){
                   writer.close();
               }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new ChatClient().start();
    }
}

  客户端UserInputHandler:

public class UserInputHandler implements Runnable {
    private ChatClient client;

    public UserInputHandler(ChatClient client) {
        this.client = client;
    }

    @Override
    public void run() {
        try {
            //接收用户输入的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            //不停的获取reader中的System.in,实现了等待用户输入的效果
            while (true) {
                String input = reader.readLine();
                //向服务器发送消息
                client.sendToServer(input);
                if (input.equals("quit"))
                    break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  运行测试:

  通过打开终端,通过javac编译。如果大家是在IDEA上编码的话可能会报编码错误,在javac后面加上-encoding utf-8再接java文件就好了。

  编译后运行,通过java运行时,又遇到了一个坑。会报找不到主类的错误,原来是因为加上两个包,要在class文件名前面加上包名。比如当前在src目录,下面有client和server两个包,要这么运行:java client.XXXX。可我之前明明在client文件夹下运行的java,也是不行,不知道为什么。

  接着测试:

  1.首先在一个终端里运行ChatServer,打开服务器

  2.在第二个终端里打开ChatClient,暂且叫A,此时服务器的终端显示:

  3.类似的,在第三个终端里打开ChatClient,暂且叫B,此时服务器显示:

  4.A中输入hi,除了服务器会打印hi外,B中也会显示,图片中的端口号和前面的不一样,是因为中间出了点小问题,前三张截图和后面的不是同时运行的。实际中同一个客户端会显示一样的端口号:

  5.当客户端输入quit时就会断开连接,最后,服务器的显示为:

原文地址:https://www.cnblogs.com/lbhym/p/12681787.html

时间: 2024-11-09 03:08:38

手动搭建I/O网络通信框架2:BIO编程模型实现群聊的相关文章

Mycat开发实践---Mycat的网络通信框架

1从一个测试说起 网上有人对Cobar和MyCAT做了一个简单的比较测试,过程如下: 1 测试环境 利用A.B.C三大类服务器,在A台上面安装配置MyCAT及Cobar,这样保证了硬件方面的一致性.B类服务器上安装Apache这一web服务,使用PHP语言.C类安装MySQL数据库,其中B类与C类均不止一台,主要目的是为了作压力的均分.C类服务器安装了4台,存放了相同的数据库,对其中一个表进行分片存储. 测试软件使用的是loadRunner.在对两个中间件分别进行测试的过程中,采用的web服务器

.NET完全手动搭建三层B/S架构

简介:三层架构(3-tier application) 通常意义上的三层架构就是将整个业务应用划分为:表现层(WebUI).业务逻辑层(BusinessLogicLayer).数据访问层(DataAccessLayer),公共层(ModelLayer).区分层次的目的即为了“高内聚,低耦合”的思想. 一.应用三层架构的优点 三层结构适合群体开发,每人可以有不同的分工,协同工作使效率倍增:各做各的模块,降低开发人员能力要求:方便系统功能的扩展以及后期的维护工作:最大优点是它的安全性.用户端只能通过

NetworkComms网络通信框架序言

03年大学毕业,主要做Web开发,大家可以看看networkcomms中文站: www.networkcomms.cn  自己基于网上开源程序二次开发的:) 从06年开始,便把主要的学习精力放到网络通信上, 主要使用C#语言,WinForm框架,sql Server数据库. 工作于大企业的IT部门,平时有较多的时间用于技术研究,即便这样,在学习的初期,几年的时间内,都无法开发出比较稳定的CS系统,网络通信中需要调试和考虑的地方太多,能开发稳定的可复用的通信系统,我想只有传说中的高手才能做到,而我

c#网络通信框架networkcomms内核解析 序言

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

JavaWeb之搭建自己的MVC框架(二)

1. 前言 在 JavaWeb之搭建自己的MVC框架(一) 中我们完成了URL到JAVA后台方法的最基本跳转.但是实际操作中会发现有一个不方便的地方,现在在com.mvc.controller包中只有一个SayController类,如果我们想增加一个新的***Controller类,我们还需要到UrlMappingCollection中修改controllerList属性,这样是不合理的. 所以我们在这一节中要将这种耦合解除掉.我们要将UrlMappingCollection中controll

Linux手动搭建LAMP环境

当你看到标题里的“手动搭建”,你是不是会想,难不成还有“自动搭建”?当然......不是,这里的“手动搭建”是指按部就班的搭建Apache.MySQL.PHP环境,是相对于集成软件包而言的.所以你是不是能够猜到,我后续还会整理一篇通过集成软件包搭建LAMP环境的文章呢? 其实关于LAMP环境,我到现在都没有用过,好多东西也都不懂为什么要这么做,当初只是心血来潮,想自己搭建一个wiki,所以才着手研究的.我不是搞PHP的,也不是搞后端的,额......是不是暴露的太多了,仅仅是为了搭建环境而搭建环

net搭建热插拔式web框架

net搭建热插拔式web框架(重造Controller) 由于.net MVC 的controller 依赖于HttpContext,而我们在上一篇中的沙箱模式已经把一次http请求转换为反射调用,并且http上下文不支持跨域,所以我们要重造一个controller. 我们在写mvc项目的时候经常会用到ViewBag.ViewData,那我们就先声明这两个变量: 1 2 public dynamic ViewBag = new DynamicViewBag(); public ViewDataD

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

c#网络通信框架networkcomms内核解析之八 数据包的核心处理器

我们先回顾一个 c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据 中,主程序把PacketBuilder 中的数据交给核心处理器处理的过程 //创建优先级队列项目 PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPac