同步服务端
同步服务端也相当简单。它需要两个线程,一个负责接收新的客户端,另外一个负责处理已经存在的客户端。它不能使用单线程;等带一个新的客户端是一个阻塞操作,所以我们需要另外一个线程来处理已经存在的客户端。
正常来说服务端都比客户端要难实现。一方面,它要管理所有已经连接的客户端。因为我们是同步的,所以我们需要至少两个线程,一个接受新的客户端连接(因为accept()是阻塞的)而另一个负责回复已经存在的客户端。
void accept_thread() { ip::tcp::acceptor acceptor(service,
ip::tcp::endpoint(ip::tcp::v4(),
8001)); while ( true) {
client_ptr new_( new talk_to_client); acceptor.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_);
} }
void handle_clients_thread() { while ( true) {
boost::this_thread::sleep( millisec(1)); boost::recursive_mutex::scoped_lock lk(cs); for(array::iterator b = clients.begin(),e = clients.end(); b
!= e; ++b)
(*b)->answer_to_client(); // 删除已经超时的客户端 clients.erase(std::remove_if(clients.begin(), clients.end(),
boost::bind(&talk_to_client::timed_out,_1)),
clients.end()); }
} int main(int argc, char* argv[]) {
boost::thread_group threads; threads.create_thread(accept_thread); threads.create_thread(handle_clients_thread); threads.join_all();
}
为了分别处理客户端发送过来的请求我们需要保存一个客户端的列表。
每个talk_to_client实例都拥有一个socket,socket类是不支持拷贝构造的,所以如果你想要把它们保存在一个std::vector方法中,你需要一个指向它的智能指针。这里有两种实现的方式:在talk_to_client内部保存一个指向socket的智能指针然后创建一个talk_to_client实例的数组,或者让talk_to_client实例用变量的方式保存socket,然后创建一个指向talk_to_client智能指针的数组。我选择后者,但是你也可以选前面的方式:
typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients; boost::recursive_mutex cs; // 用线程安全的方式访问客户端数组
talk_to_client的主要代码如下:
struct talk_to_client : boost::enable_shared_from_this<talk_to_client> {
talk_to_client() { ... } std::string username() const { return username_; } void answer_to_client() {
try { read_request();
process_request(); } catch ( boost::system::system_error&) {
stop(); }
if ( timed_out()) stop();
} void set_clients_changed() { clients_changed_ = true; } ip::tcp::socket & sock() { return sock_; } bool timed_out() const {
ptime now = microsec_clock::local_time(); long long ms = (now - last_ping).total_milliseconds(); return ms > 5000 ;
} void stop() {
boost::system::error_code err; sock_.close(err); }
void read_request() {
if ( sock_.available())
read_)); }
... private:
already_read_ += sock_.read_some( buffer(buff_ + already_read_, max_msg - already_
// ... same as in Synchronous Client bool clients_changed_; ptime last_ping;
};
上述代码拥有非常好的自释。最重要的方法是read_request()。它只有在存在有效数据的情况才读取,这样的话,服务端永远不会阻塞:
void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, ‘\n‘)
< buff_ + already_read_;
if ( !found_enter) return; // message is not full
// process the msg last_ping = microsec_clock::local_time(); size_t pos = std::find(buff_, buff_ + already_read_, ‘\n‘) -
buff_; std::string msg(buff_, pos); std::copy(buff_ + already_read_, buff_ + max_msg, buff_); already_read_ -= pos + 1;
if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else if ( msg.find("ask_clients") == 0) on_clients(); else std::cerr << "invalid msg " << msg << std::endl;
} void on_login(const std::string & msg) {
std::istringstream in(msg); in >> username_ >> username_; write("login ok\n"); update_clients_changed();
}
void on_ping() {
write(clients_changed_ ? "ping client_list_changed\n" : "ping
ok\n"); clients_changed_ = false;
} void on_clients() {
std::string msg; { boost::recursive_mutex::scoped_lock lk(cs);
for( array::const_iterator b = clients.begin(), e = clients. end() ;
b != e; ++b) msg += (*b)->username() + " ";
}
write("clients " + msg + "\n"); }
void write(const std::string & msg) { sock_.write_some(buffer(msg)); }
观察process_request()。当我们读取到足够多有效的数据时,我们需要知道我们是否已经读取到整个消息(如果found_enter为真)。这样做的话,我们可以使我们避免一次读多个消息的可能(’\n’之后的消息被保存到缓冲区中),然后我们解析读取到的整个消息。剩下的代码都是易懂的。