这个socket server可以:
- 非阻塞的处理多个socket连接。
- 可以接收来自客户端的ping消息,并把5秒内无活动的客户端移除。
- 可以接收客户端的login请求,使用者可以按自己需求加入认证逻辑。
/*‘‘‘ Non-Blocking socket server using blocking API Created on Dec 25, 2014 (merry christmas) @author: ScottGu<[email protected], [email protected]> performance tested: environment: 64bit win7, i7-4800MQ, 8GB ‘‘‘*/ #pragma once boost::recursive_mutex cs; // thread-safe access to clients array struct talk_to_client : boost::enable_shared_from_this<talk_to_client> { talk_to_client(boost::asio::io_service& io_service) :sock_(io_service) { } std::string username() const { return username_; } void answer_to_client() { try { read_request(); process_request(); } catch (boost::system::system_error&) { stop(); } if (timed_out()) stop(); } void read_request() { if (sock_.available()) { already_read_ += sock_.read_some( boost::asio::buffer(buff_ + already_read_, max_msg - already_read_)); } } void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, ‘\n‘)< buff_ + already_read_; if (!found_enter) return; // message is not full // process the msg last_ping_ = boost::posix_time::microsec_clock::local_time(); size_t pos = std::find(buff_, buff_ + already_read_, ‘\n‘) - buff_; std::string msg(buff_, pos); std::copy(buff_ + pos, buff_ + already_read_, buff_); //std::copy(buff_ + already_read_, buff_ + max_msg, buff_); already_read_ -= pos + 1; if (msg.find("login ") == 0) on_login(msg); else if (msg.find("ping") == 0) on_ping(); else if (msg.find("ask_clients") == 0) on_clients(); else std::cerr << "invalid msg " << msg << std::endl; } void set_clients_changed() { clients_changed_ = true; } boost::asio::ip::tcp::socket & sock() { return sock_; } bool timed_out() const { boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); long long ms = (now - last_ping_).total_milliseconds(); return ms > 5000; } void stop() { boost::system::error_code err; sock_.close(err); } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; write("login ok\n"); //update_clients_changed(); } void on_ping() { write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n"); clients_changed_ = false; } void on_clients(); void write(const std::string & msg) { sock_.write_some(boost::asio::buffer(msg)); } private: // in Synchronous Client field are same bool clients_changed_; boost::posix_time::ptime last_ping_; boost::asio::ip::tcp::socket sock_; enum { max_msg = 6*1024 }; int already_read_; char buff_[max_msg]; bool started_; std::string username_; }; typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients; void talk_to_client::on_clients() { std::string msg; boost::recursive_mutex::scoped_lock lk(cs); for (auto b = clients.begin(), e = clients.end(); b != e; ++b){ msg += (*b)->username() + " "; } write("clients " + msg + "\n"); } void accept_thread() { boost::asio::io_service io_service; boost::asio::ip::tcp::acceptor acceptr(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8001)); while (true) { client_ptr new_(new talk_to_client(io_service)); acceptr.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_); } } void handle_clients_thread() { while (true) { boost::this_thread::sleep(boost::posix_time::millisec(1)); boost::recursive_mutex::scoped_lock lk(cs); for (array::iterator b = clients.begin(), e = clients.end(); b != e; ++b){ (*b)->answer_to_client(); } // erase clients that timed out clients.erase(std::remove_if(clients.begin(), clients.end(), boost::bind(&talk_to_client::timed_out, _1)), clients.end()); } } int run_sync_talk_server() { boost::thread_group threads; threads.create_thread(boost::bind(accept_thread)); threads.create_thread(handle_clients_thread); threads.join_all(); }
时间: 2024-10-27 15:43:08