异步服务端
这个图表是相当复杂的;从Boost.Asio出来你可以看到4个箭头指向on_accept,on_read,on_write和on_check_ping。着也就意味着你永远不知道哪个异步调用是下一个完成的调用,但是你可以确定的是它是这4个操作中的一个。
现在,我们是异步的了;我们可以继续保持单线程。接受客户端连接是最简单的部分,如下所示:
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)); void handle_accept(talk_to_client::ptr client, const error_code & err) {
client->start(); talk_to_client::ptr new_client = talk_to_client::new_(); acceptor.async_accept(new_client->sock(),
boost::bind(handle_accept,new_client,_1));
} int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_(); acceptor.async_accept(client->sock(),boost::bind(handle_accept,client,_1));
service.run(); }
上述代码会一直异步地等待一个新的客户端连接(每个新的客户端连接会触发另外一个异步等待操作)。
我们需要监控client list changed事件(一个新客户端连接或者一个客户端断开连接),然后当事件发生时通知所有的客户端。因此,我们需要保存一个客户端连接的数组,否则,除非你不需要在某一时刻知道所有连接的客户端,你才不需要这样一个数组。
class talk_to_client; typedef boost::shared_ptr<talk_to_client> client_ptr; typedef std::vector<client_ptr> array; array clients;
connection类的框架如下:
class talk_to_client : public boost::enable_shared_from_this<talk_to_
client>
, boost::noncopyable { talk_to_client() { ... }
public: typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_client> ptr; void start() {
started_ = true; clients.push_back( shared_from_this()); last_ping = boost::posix_time::microsec_clock::local_time(); do_read(); //首先,我们等待客户端连接
} static ptr new_() { ptr new_(new talk_to_client); return new_; } void stop() {
if ( !started_) return; started_ = false; sock_.close(); ptr self = shared_from_this(); array::iterator it = std::find(clients.begin(), clients.end(),
self); clients.erase(it);
update_clients_changed(); }
bool started() const { return started_; } ip::tcp::socket & sock() { return sock_;} std::string username() const { return username_; } void set_clients_changed() { clients_changed_ = true; } …
private:
ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string username_; deadline_timer timer_; boost::posix_time::ptime last_ping; bool clients_changed_;
};
我会用talk_to_client或者talk_to_server来调用connection类,从而使你更明白我所说的内容。
现在你需要用到之前的代码了;它和我们在客户端应用中所用到的是一样的。但是我们还有另外一个stop()方法,这个方法用来从客户端数组中移除一个客户端连接。
服务端持续不断地等待异步的read操作:
void on_read(const error_code & err, size_t bytes) { if ( err) stop(); if ( !started() ) return; std::string msg(read_buffer_, bytes);
if ( msg.find("login ") == 0) on_login(msg); else if ( msg.find("ping") == 0) on_ping(); else if ( msg.find("ask_clients") == 0) on_clients();
} void on_login(const std::string & msg) {
std::istringstream in(msg); in >> username_ >> username_; do_write("login ok\n"); update_clients_changed();
} void on_ping() {
do_write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
clients_changed_ = false; }
void on_clients() {
std::string msg;
for(array::const_iterator b =clients.begin(),e =clients.end(); b != e; ++b)
msg += (*b)->username() + " "; do_write("clients " + msg + "\n");
}
这段代码是简单易懂的;需要注意的一点是:当一个新客户端登录,我们调用update_clients_changed(),这个方法为所有客户端将clients_changed_标志为true。
服务端每收到一个请求就用正确的方式进行回复,如下所示:
void do_ping() { do_write("ping\n"); } void do_ask_clients() { do_write("ask_clients\n"); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
post_check_ping(); }
void do_write(const std::string & msg) { if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()),
MEM_FN2(on_write,_1,_2));
} size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// ... as before }
在每个write操作的末尾,on_write()方法被调用,这个方法会触发另外一个异步读操作,这样的话“等待请求-回复请求”这个循环酒会一直执行,直到客户端断开连接或者超时。
在每次读操作开始之前,我们异步等待5秒钟来观察客户端是否超时。如果超时,我们关闭它的连接:
void on_check_ping() { ptime now = microsec_clock::local_time(); if ( (now - last_ping).total_milliseconds() > 5000)
stop(); last_ping = boost::posix_time::microsec_clock::local_time();
} void post_check_ping() {
timer_.expires_from_now(boost::posix_time::millisec(5000));
timer_.async_wait( MEM_FN(on_check_ping)); }
这就是整个服务端的实现。你可以运行并让它工作起来!
在代码中,我向你们展示了这一章我们学到的东西,为了更容易理解,我把代码稍微精简了下;比如,大部分的控制台输出我都没有展示,尽管在这本书附赠的代码中它们是存在的。我建议你自己运行这些例子,因为从头到尾读一次代码能加强你对本章展示应用的理解。
总结
我们已经学到了怎么写一些基础的客户端/服务端应用。我们已经避免了一些诸如内存泄漏和死锁的低级错误。所有的编码都是框架式的,这样你就可以根据你自己的需求对它们进行扩展。
在接下来的章节中,我们会更加深入地了解使用Boost.Asio进行同步编程和异步编程的不同点,同时你会也学会如何嵌入你自己的异步操作。