使用同步socket创建非阻塞socket server

这个socket server可以:

  1. 非阻塞的处理多个socket连接。
  2. 可以接收来自客户端的ping消息,并把5秒内无活动的客户端移除。
  3. 可以接收客户端的login请求,使用者可以按自己需求加入认证逻辑。
/*‘‘‘

Non-Blocking socket server using blocking API

Created on Dec 25, 2014 (merry christmas)

@author: ScottGu<[email protected], [email protected]>

performance tested:

environment: 64bit win7, i7-4800MQ, 8GB
‘‘‘*/

#pragma once
boost::recursive_mutex cs; // thread-safe access to clients array

struct talk_to_client
    : boost::enable_shared_from_this<talk_to_client>
{
    talk_to_client(boost::asio::io_service& io_service)
        :sock_(io_service)
    {
    }

    std::string username() const
    {
        return username_;
    }

    void answer_to_client() {
        try {
            read_request();
            process_request();
        }
        catch (boost::system::system_error&) {
            stop();
        }
        if (timed_out())
            stop();
    }

    void read_request() {
        if (sock_.available())
        {
            already_read_ += sock_.read_some(
                boost::asio::buffer(buff_ + already_read_, max_msg - already_read_));
        }
    }

    void process_request() {
        bool found_enter = std::find(buff_, buff_ + already_read_, ‘\n‘)< buff_ + already_read_;
        if (!found_enter)
            return; // message is not full
        // process the msg
        last_ping_ = boost::posix_time::microsec_clock::local_time();
        size_t pos = std::find(buff_, buff_ + already_read_, ‘\n‘) -
            buff_;
        std::string msg(buff_, pos);

        std::copy(buff_ + pos, buff_ + already_read_, buff_);
        //std::copy(buff_ + already_read_, buff_ + max_msg, buff_);
        already_read_ -= pos + 1;

        if (msg.find("login ") == 0) on_login(msg);
        else if (msg.find("ping") == 0) on_ping();
        else if (msg.find("ask_clients") == 0) on_clients();
        else std::cerr << "invalid msg " << msg << std::endl;
    }

    void set_clients_changed() { clients_changed_ = true; }
    boost::asio::ip::tcp::socket & sock() { return sock_; }

    bool timed_out() const {
        boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
        long long ms = (now - last_ping_).total_milliseconds();
        return ms > 5000;
    }

    void stop() {
        boost::system::error_code err; sock_.close(err);
    }
    void on_login(const std::string & msg) {
        std::istringstream in(msg);
        in >> username_ >> username_;
        write("login ok\n");
        //update_clients_changed();
    }

    void on_ping() {
        write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
        clients_changed_ = false;
    }

    void on_clients();

    void write(const std::string & msg) { sock_.write_some(boost::asio::buffer(msg)); }

private:
    //   in Synchronous Client field are same
    bool clients_changed_;
    boost::posix_time::ptime last_ping_;
    boost::asio::ip::tcp::socket sock_;

    enum { max_msg = 6*1024 };
    int already_read_;
    char buff_[max_msg];
    bool started_;
    std::string username_;

};

typedef boost::shared_ptr<talk_to_client> client_ptr;
typedef std::vector<client_ptr> array;
array clients;

void talk_to_client::on_clients() {
    std::string msg;
    boost::recursive_mutex::scoped_lock lk(cs);
    for (auto b = clients.begin(), e = clients.end(); b != e; ++b){
        msg += (*b)->username() + " ";
    }
    write("clients " + msg + "\n");
}

void accept_thread() {
    boost::asio::io_service io_service;
    boost::asio::ip::tcp::acceptor acceptr(io_service,
        boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8001));

    while (true) {
        client_ptr new_(new talk_to_client(io_service));

        acceptr.accept(new_->sock());

        boost::recursive_mutex::scoped_lock lk(cs);
        clients.push_back(new_);
    }
}

void handle_clients_thread() {
    while (true) {
        boost::this_thread::sleep(boost::posix_time::millisec(1));
        boost::recursive_mutex::scoped_lock lk(cs);
        for (array::iterator b = clients.begin(), e = clients.end(); b != e; ++b){
            (*b)->answer_to_client();
        }

        // erase clients that timed out
        clients.erase(std::remove_if(clients.begin(), clients.end(),
            boost::bind(&talk_to_client::timed_out, _1)), clients.end());
    }
}

int run_sync_talk_server() {

    boost::thread_group threads;
    threads.create_thread(boost::bind(accept_thread));
    threads.create_thread(handle_clients_thread);
    threads.join_all();
}
时间: 2024-12-28 01:27:53

使用同步socket创建非阻塞socket server的相关文章

网络编程中阻塞和非阻塞socket的区别

阻塞socket和非阻塞socket 建立连接阻塞方式下,connect首先发送SYN请求道服务器,当客户端收到服务器返回的SYN的确认时,则connect返回.否则的话一直阻塞.非阻塞方式,connect将启用TCP协议的三次握手,但是connect函数并不等待连接建立好才返回,而是立即返回.返回的错误码为EINPROGRESS,表示正在进行某种过程. 接收连接对于阻塞方式的倾听socket,accept在连接队列中没有建立好的连接时将阻塞,直到有可用的连接,才返回.非阻塞倾听socket,在

JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

阻塞IO与非阻塞IO 通常情况下的Socket都是阻塞式的, 程序的输入输出都会让当前线程进入阻塞状态, 因此服务器需要为每一个客户端都创建一个线程. 从JAVA1.4开始引入了NIO API, NIO可以实现非阻塞IO, 这样就可以使用一个线程处理所有的客户请求. 基于NIO的非阻塞Socket通信 服务器将用来监听客户端请求的channel注册到selector上,启动一个线程,使用selector的select()获取求情的客户端的channel数量, 当监听到有客户端请求时,就通过Sel

非阻塞socket与epoll

阻塞socket. –阻塞调用是指调用结果返回之前,当前线程会被挂起.函数只有在得到结果之后才会返回. –对于文件操作read,fread函数调用会将线程阻塞. –对于socket,accept与recv.recvfrom函数调用会将线程阻塞. –为了避免整个进程被阻塞后挂起,所以在阻塞模式下,往往需要采用多线程技术. –一个进程中可并发的线程总数是有限的,在处理大量客户端sokcet连接(比如上万个client socket),通过线程并发处理socket并不方便,效率也不高. 非阻塞sock

Linux - 非阻塞socket编程处理EAGAIN错误

在linux进行非阻塞的socket接收数据时经常出现Resource temporarily unavailable,errno代码为11(EAGAIN),这表明你在非阻塞模式下调用了阻塞操作,在该操作没有完成就返回这个错误,这个错误不会破坏socket的同步,不用管它,下次循环接着recv就可以. 对非阻塞socket而言,EAGAIN不是一种错误.在VxWorks和Windows上,EAGAIN的名字叫做EWOULDBLOCK. 另外,如果出现EINTR即errno为4,错误描述Inter

linux下异步RPC的阶段性总结-非阻塞SOCKET客户端

尽可能使用非阻塞socket int flags, s;    flags = fcntl (fd, F_GETFL, 0);        if (flags == -1){            close(fd);          return -1;      }          flags |= O_NONBLOCK;        s = fcntl (fd, F_SETFL, flags);        if (s == -1){            close(fd); 

从缓冲上看阻塞与非阻塞socket在发送接收上的区别(转载)

转自:http://blog.chinaunix.net/uid-24517549-id-4044877.html 首先socket在默认情况下是阻塞状态的,这就使得发送以及接收操作处于阻塞的状态,即调用不会立即返回,而是进入睡眠等待操作完成.   一.发送选用send(这里特指TCP)以及sendto(这里特指UDP)来描述 首先需要说明的是,不管阻塞还是非阻塞,在发送时都会将数据从应用缓冲区拷贝到内核缓冲区(SO_RCVBUF选项声明,除非缓冲区大小为0).     在阻塞模式下send操作

客户端非阻塞socket建链流程

TCP协议是面向连接的.可靠的.基于字节流的传输层协议.那使用tcp协议进行通信的两端是如何进行通信的?使用tcp协议进行通信的两端是通过套接字(scoket)来建立连接的.套接字socket主要有两种类型,阻塞和非阻塞.通常为了防止进程阻塞以及避免cpu被长时间占用,客户端和服务端一般都会采用非阻塞socket进行通信,其中Nginx就是一个典型的例子.下面我们就以Nginx的upstream机制所涉及的与后端服务器建链的流程来总结下使用非阻塞socket的客户端建链流程. 先来看下Nginx

非阻塞socket学习,select基本用法

server #include <stdio.h> #include <winsock2.h> #include <iostream> #pragma comment(lib, "WS2_32.lib") #define PORT 9999 #define DATA_BUFSIZE 8192 typedef struct _SOCKET_INFORMATION{ CHAR Buffer[DATA_BUFSIZE]; //发送和接收数据的缓冲区 WSA

异步非阻塞socket的实现

在学习使用scrapy爬虫框架之前,需要了解一些基础原理 我们知道HTTP请求是基于socket模块进行发送和接受的,但是socket套接字的在使用的中存在着阻塞,不利用爬虫的高性能运行,所以我们就需要对框架进行一些高性能设置,使用select模块,检测socket请求的IO操作,实现对socket的高性能运行: 以下是代码示例: import socket import select class Request(object): ''' request类的定义是应对请求的连接不同时,做的低耦合