服务端应用中的同步I/O
类似客户端,服务端也被分为两种情况用来匹配之前章节中的情况1和情况2。同样,两种情况都采用“发送请求-读取结果”的策略。
第一种情况是我们在之前章节实现过的同步服务端。当你是同步时读取一个完整的请求不是很简单,因为你需要避免阻塞(通常来说是能读多少就读多少):
void read_request() { if ( sock_.available())
}
already_read_ += sock_.read_some( buffer(buff_ + already_read_, max_msg - already_read_));
只要一个消息被完整读到,就对它进行处理然后回复给客户端:
void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, ‘\n‘)
< buff_ + already_read_;
if ( !found_enter) return; // message is not full
size_t pos = std::find(buff_, buff_ + already_read_, ‘\n‘) - buff_;
std::string msg(buff_, pos); ...
if ( msg.find("login ") == 0) on_login(msg);
else if ( msg.find("ping") == 0) on_ping(); else ...
}
如果我们想让我们的服务端变成一个推送服务端,我们通过如下的方式修改:
typedef std::vector<client_ptr> array; array clients; array notify; std::string notify_msg;
void on_new_client() { // on a new client, we notify all clients of this event notify = clients; std::ostringstream msg; msg << "client count " << clients.size(); notify_msg = msg.str(); notify_clients();
} void notify_clients() {
for ( array::const_iterator b = notify.begin(), e = notify.end(); b != e; ++b) {
(*b)->sock_.write_some(notify_msg); }
}
on_new_client()方法是事件之一,这个事件我们需要通知素有的客户端。notify_clients是通知所有对一个事件感兴趣客户端的方法。它发送消息但是不等待每个客户端返回的结果,因为那样的话就会导致阻塞。当客户端返回一个结果时,客户端会告诉我们它为什么回复(然后我们就可以正确地处理它)。
同步服务端中的线程
这是一个非常重要的关注点:我们开辟多少线程去处理服务端请求?
对于一个同步服务端,我们至少需要一个处理新连接的线程:
void accept_thread() { ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp:
:v4(),8001)); while ( true) {
client_ptr new_( new talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_);
} }
对于已经存在的客户端:
- 我们可以是单线程。这是最简单的,同时也是我在第四章 同步服务端中采用的实现方式。它可以很轻松地处理100-200并发的客户端而且有时候会更多,对于大多数情况来说这已经足够用了。
- 我们可以对每个客户端开一个线程。这不是一个很好的选择;他会浪费很多线程而且有时候会导致调试困难,而且当它需要处理200以上并发的客户端的时候,它可能马上会到达它的瓶颈。
- 我们可以用一些固定数量的线程去处理已经存在的客户端
第三种选择是同步服务端中最难实现的;整个talk_to_client类需要是线程安全的。然后,你需要一个机制来确定哪个线程处理哪个客户端。对于这个问题,你有两个选择:
- 将特定的客户端分配给特定的线程;比如,线程1处理前面20个客户端,线程2处理21到40个线程,等等。当一个线程在使用时(我们在等待在客户端阻塞的一些东西),我们从以存在客户端列表中将其取出来。等我们处理完之后,再把它放回到列表中。每个线程都会循环遍历已经存在的客户端列表,然后把拥有完整请求的第一个客户端提出来(我们已经从客户端读取了一条完整的消息),然后回复它。
- 服务端可能会变得无响应
- 第一种情况,被同一个线程处理的几个客户端同时发送请求,因为一个线程在同一时刻只能处理一个请求。所以这种情况我们什么也不能做。
- 第二种情况,如果我们发现并发请求大于当前线程个数的时候。我们可以简单地创建新线程来处理当前的压力。
下面的代码片段有点类似之前的answer_to_client方法,它向我们展示了第二种方法的实现方式:
-
struct talk_to_client : boost::enable_shared_from_this<talk_to_client> {
... void answer_to_client() {
try { read_request();
process_request(); } catch ( boost::system::system_error&) {
stop(); }
} };
我们需要对它进行修改使它变成下面代码片段的样子:
-
struct talk_to_client : boost::enable_shared_from_this<talk_to_client> {
boost::recursive_mutex cs; boost::recursive_mutex cs_ask; bool in_process; void answer_to_client() {
{ boost::recursive_mutex::scoped_lock lk(cs_ask); if ( in_process)
return; in_process = true;
} { boost::recursive_mutex::scoped_lock lk(cs); try {
read_request(); process_request();}
catch ( boost::system::system_error&) {
stop();
} } { boost::recursive_mutex::scoped_lock lk(cs_ask);
in_process = false; }
} };
当我们处理一个客户端的时候,它的in_process变量被设置成true,其他的线程就会忽略这个客户端。额外的福利就是handle_clients_thread()方法不需要做任何修改;你可以随心所欲地创建你想要数量的handle_clients_thread()方法。
时间: 2024-10-07 10:25:30