一个非常完善的基于Socket的多服务器通信框架

一共4个文件

  • XServerReceiver : Socket接受处理XServer返回工具类 XServerReceiver充当服务器
  • XServerSender : Socket连接发送XServer服务器工具类 XServerSender充当客户端
  • ClientTest : 客户端测试程序
  • ServerTest : 服务端测试程序

XServerReceiver.java

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentLinkedQueue;

class XServerReceiverThread extends Thread {
    private Socket m_socket = null;
    private InputStream m_input = null;
    private OutputStream m_output = null;
    private BufferedReader m_br = null;
    private String m_clientHost = "";// 客户端的socket地址

    public XServerReceiverThread(Socket socket) {
        try {
            this.m_socket = socket;
            this.m_input = socket.getInputStream();
            this.m_output = socket.getOutputStream();
            this.m_br = new BufferedReader(new InputStreamReader(this.m_input, "UTF-8"));
            this.m_clientHost = m_socket.getRemoteSocketAddress().toString();
            this.m_clientHost = this.m_clientHost.substring(1);//因为m_clientHost为  /192.168.0.83:52177
            this.m_socket.setSoTimeout(3000);

            System.out.println("connection established from " + m_clientHost);
        } catch (Exception ex) {
            ex.printStackTrace();
            closeHandles();
        }
    }

    public void run() {
        try {
            while (true) {
                String event = "";

                while (true) {
                    int one = this.m_br.read();
                    if (one == -1) {
                        closeHandles();
                        break;
                    }

                    event += String.valueOf((char) one);
                    if (one == ‘\1‘)// 一般消息尾部截止为一个不可见字符,这里假设为‘\1‘
                    {
                        XServerReceiver.PushEvent(event);
                        break;
                    }
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            closeHandles();
        }
    }

    private void closeHandles() {
        System.out.println("connection closed from " + m_clientHost);

        try {
            if (this.m_br != null) {
                this.m_br.close();
                this.m_br = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (this.m_input != null) {
                this.m_input.close();
                this.m_input = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (this.m_output != null) {
                this.m_output.close();
                this.m_output = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (this.m_socket != null) {
                this.m_socket.close();
                this.m_socket = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

class XServerListenThread extends Thread {
    private ServerSocket server_socket = null;
    private boolean m_pleaseWait = true;

    public void run() {
        while (true) {
            try {
                if (server_socket == null || server_socket.isClosed() || !server_socket.isBound()) {
                    this.server_socket = new ServerSocket(5600);// XServer.TCP_PORT_FOR_OTHER_SERVER,这里假设为5600
                    System.out.println("TCP Processor is listening on " + 5600);
                    m_pleaseWait = false;
                }

                Socket socket = this.server_socket.accept();
                XServerReceiverThread xt = new XServerReceiverThread(socket);
                xt.start();
            } catch (BindException bex) {
                bex.printStackTrace();

                try {
                    sleep(5000);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public void Wait() {
        try {
            while (m_pleaseWait)
                Thread.sleep(10);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

class XServerHandlerThread extends Thread {
    public void run() {
        try {
            while (true) {
                String event = XServerReceiver.PopEvent();

                if (event == null) {
                    Thread.sleep(1000);
                    continue;
                }

                EventHandler(event);// 处理消息事件
                event = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    private void EventHandler(String event) {
        System.out.println("Received event: " + event);
    }
}

/**
 *
 * Socket接受处理XServer返回工具类
 * XServerReceiver充当服务器
 *
 * @author vhreal
 *
 */
public class XServerReceiver {
    static private ConcurrentLinkedQueue<String> m_eventQueue = new ConcurrentLinkedQueue<String>();
    static private XServerListenThread m_XServerListenThread = null;// 监听线程
    static private XServerHandlerThread m_XServerHandlerThread = null;// 处理线程

    static public void PushEvent(String event) {// 外部类调用XServerReceiver.PushEvent(event);
        m_eventQueue.add(event);
    }

    static public String PopEvent() {// 外部类调用XServerReceiver.PopEvent(event);
        if (m_eventQueue.size() == 0)
            return null;

        return m_eventQueue.remove();
    }

    static public void Start()// 外部类调用XServerReceiver.Start();
    {
        m_XServerListenThread = new XServerListenThread();
        m_XServerListenThread.start();
        m_XServerListenThread.Wait();

        m_XServerHandlerThread = new XServerHandlerThread();
        m_XServerHandlerThread.start();
    }
}

XServerSender .java

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentLinkedQueue;

class XServerSenderThread extends Thread {
    private Socket m_socket = null;
    private OutputStream m_output = null;
    private InputStream m_input = null;
    private InputStreamReader m_isr = null;
    private BufferedReader m_br = null;
    private ConcurrentLinkedQueue<String> m_Events = new ConcurrentLinkedQueue<String>();

    private String m_ServerHost = "192.168.0.83";
    private int m_ServerPort = 5600;

    public void run() {
        long lastkeepalive = 0;
        long now = 0;

        while (true) {
            try {
                if (m_socket == null || m_socket.isClosed()) {
                    Connect2XServer();
                    continue;
                }

                now = System.currentTimeMillis() / 1000;
                if (now - lastkeepalive > 2)// 2s发一次心跳
                {
                    RequestKeepAlive();
                    lastkeepalive = now;
                }

                String event = PopEvent();
                if (event == null) {
                    Thread.sleep(1000);
                    continue;
                }

                SendEvent2XServer(event);
                event = null;
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public void PushEvent(String event) {
        m_Events.add(event);
    }

    private String PopEvent() {
        if (m_Events.size() == 0)
            return null;

        return m_Events.remove();
    }

    private boolean SendEvent2XServer(String event) {
        try {
            m_output.write(event.getBytes());

            m_output.flush();
            return true;
        } catch (Exception ex) {
            closeHandles();
            return false;
        }
    }

    private boolean RequestKeepAlive() {
        try {
            String json = String
                    .format("{\"AttributeTimeinSecs\":\"%d\",\"AttributeEventName\":\"EventKeepAlive\"}",
                            System.currentTimeMillis() / 1000);
            m_output.write(json.getBytes());
            m_output.flush();
            return true;
        } catch (Exception ex) {
            closeHandles();
            return false;
        }
    }

    private boolean Connect2XServer() {
        try {
            System.out.println("connecting to XServer");

            SocketAddress endpoint = new InetSocketAddress(m_ServerHost, m_ServerPort);// XServer的Ip和端口号
            m_socket = new Socket();
            m_socket.setSoTimeout(3000);// 3s网络超时
            m_socket.connect(endpoint);

            m_output = m_socket.getOutputStream();
            m_input = m_socket.getInputStream();
            m_isr = new InputStreamReader(m_input, "UTF-8");
            m_br = new BufferedReader(m_isr);

            System.out.println("connected to XServer");
            return true;
        } catch (Exception ex) {
            ex.printStackTrace();
            closeHandles();
        }

        return false;
    }

    private void closeHandles() {
        try {
            if (m_isr != null) {
                m_isr.close();
                m_isr = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_br != null) {
                m_br.close();
                m_br = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_input != null) {
                m_input.close();
                m_input = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_output != null) {
                m_output.close();
                m_output = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_socket != null) {
                m_socket.close();
                m_socket = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            Thread.sleep(1000);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

/**
 *
 * Socket连接发送XServer服务器工具类 XServerSender充当客户端
 *
 * @author vhreal
 *
 */
public class XServerSender {
    static private XServerSenderThread m_SenderThread = null;

    static public void Start() {// 外部类调用XServerSender.Start();
        try {
            m_SenderThread = new XServerSenderThread();
            m_SenderThread.start();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    static public void SendEvent2XServer(String event) {// 外部类调用XServerSender.SendEvent2XServer(event);
        if (m_SenderThread != null) {
            m_SenderThread.PushEvent(event);
        }
    }
}

ClientTest.java

public class ClientTest {

    public static void main(String[] args) {
        XServerSender.Start();
        XClientThread xc = new XClientThread();
        xc.start();
    }

}

class XClientThread extends Thread {
    public void run() {
        try {
            while (true) {
                String s = String.format("hello,Xsever!\1");
                XServerSender.SendEvent2XServer(s);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

ServerTest.java


public class ServerTest {

    public static void main(String[] args) {
        XServerReceiver.Start();
    }

}

测试结果

时间: 2024-10-11 02:51:33

一个非常完善的基于Socket的多服务器通信框架的相关文章

AF_UNIX域通信(基于socket和pipe的通信,只适于UNIX系统S&C同在一个主机上,用于进程通信)

服务器端: #include<stdio.h>#include<unistd.h>#include<stdlib.h>#include<sys/types.h>#include<sys/stat.h>#include<fcntl.h>#include <sys/socket.h>#include <sys/un.h>#include <stddef.h>char buf[100];void main

C语言 linux环境基于socket的简易即时通信程序

转载请注明出处:http://www.cnblogs.com/kevince/p/3891033.html   By Kevince 最近在看linux网络编程相关,现学现卖,就写了一个简易的C/S即时通信程序,代码如下: head.h 1 /*头文件,client和server编译时都需要使用*/ 2 #include <unistd.h> 3 #include <stdio.h> 4 #include <sys/types.h> 5 #include <sys

基于socket的web服务器检测

# coding=utf-8 import sys import socket import re def check_webserver(address, port, resource): address = socket.gethostbyname(address) if not resource.startswith('/'): resource = '/' + resource request_string = 'GET %s HTTP/1.0\r\n\r\n' % (resource)

一个基于共享内存的ipc通信框架

一个基于共享内存的ipc通信框架 与共享内存相关的操作主要包括共享内存的初始化, 共享内存的释放, 共享内存的锁的相关操作, 在这里操作共享内存的环境是: 1 多个进程没有亲缘关系, 也没有server/client关系, 是多个不相关进程并发操作共享内存 2 共享内存一开始不存在, 由第一个访问他的进程创建 3 当共享内存退出时, 由最后一个离开的进程释放共享内存, 并清除信号量 在这个问题之中, 主要有两个比较大的问题: 1 怎样新建并初始化共享内存 新建共享内存的数据都可以用信号量来控制,

漫谈:一个简单的单线程基于epoll的echo服务器(附简单的性能测试)

为什么使用epoll 这个是老生常谈了,四个字,多路复用,要不单线程只能停等排队.另外select和poll不如epoll强大好用. 程序结构漫谈 代码很简陋,基本属于玩具.但是还是随便谈谈. 在单线程模型下使用epoll,只能使用一个epoll的instance同时监听socket描述符和connection描述符.当socket描述符就位时,就调用accept处理三次握手建立连接,同时将调用epoll_ctl将这个connfd加入epoll的事件监听表中.如果connfd就位,就调用recv

第十三章:基于socket.io实现即时通信

服务器端的搭建参考socket io官网,里面有非常详细的描述,按照步骤下来,最终可以在localhost进行模拟聊天. 下面是客户端的说明. 引入socket.io.js: <script src="js/socket.io.js"></script> 定义Chats tab: <!-- Chats Tab --> <ion-tab title="Chats" icon-off="ion-ios-chatboxe

分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载

一.分布式消息总线 在很多MIS项目之中都有这样的需求,需要一个及时.高效的的通知机制,即比如当使用者A完成了任务X,就需要立即告知使用者B任务X已经完成,在通常的情况下,开发人中都是在使用者B所使用的程序之中写数据库轮循代码,这样就会产品一个很严重的两个问题,第一个问题是延迟,轮循机制要定时执行,必须会引起延迟,第二个问题是数据库压力过大,当进行高频度的轮循会生产大量的数据库查询,并且如果有大量的使用者进行轮循,那数据库的压力就更大了. 那么在这个时间,就需要一套能支持发布-订阅模式的分布式消

如何基于WKWebView开发一个功能完善的资讯内容页

前言 对于资讯类的APP来说 良好的阅读体验是必不可少的, 那么如何去开发一个功能完善的资讯文章页面就是本文要说的重点.相信本文会对很多在做同类功能开发的道友们有很大的帮助 , 如果某只大佬路过也欢迎指点一二. 废话不多说 开讲(chui)~ 分析 数据 对于图文混排的富文本形式 , 最好最通用的数据格式当属 HTML , 再加上 CSS 和 JS 的配合, 可以随心所欲的展示出成百上千在不同的样式.当然 , 除了 HTML 也不排除有使用其他规则的数据格式来表示.但这里我们还是选择使用 HTM

TCP/IP协议学习(五) 基于C# Socket的Web服务器---动态通讯实现

目录 (1).基于Ajax的前端实现 (2).Web服务器后端处理 一个完整的web服务器,不仅需要满足用户端对于图片.文档等资源的需求:还能够对于用户端的动态请求,返回指定程序生成的数据.支持动态请求处理是web服务器的必要组成部分,现有比较成熟的前端动态技术有CGI,ASP/ASP.net, PHP,原生javascript实现的Ajax技术以及基于HTML5的webSocket通讯,它们每一项都涉及很多相关知识,不过归结到核心都是前后端的数据交互,特别是对于后端来说并没有太大区别.作为动态