异步客户端/服务端
现在,是比较有趣(也比较难)的异步!
当查看图表时,你需要知道Boost.Asio代表由Boost.Asio执行的一个异步调用。例如do_read(),Boost.Asio和on_read()代表了从do_read()到on_read()的逻辑流程,但是你永远不知道什么时候轮到on_read(),你只是知道你最终会调用它。
异步客户端
到这里事情会变的有点复杂但肯定是可控的。当然你也会拥有一个不会阻塞的应用。
对下面的代码你应该已经很熟悉:
#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this()) #define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_ this(),y) #define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_ this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , boost::noncopyable {
typedef talk_to_svr self_type; talk_to_svr(const std::string & username)
: sock_(service), started_(true), username_(username), timer_ (service) {}
void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, MEM_FN1(on_connect,_1));
} public:
typedef boost::system::error_code error_code; typedef boost::shared_ptr<talk_to_svr> ptr;
static ptr start(ip::tcp::endpoint ep, const std::string & username) {
ptr new_(new talk_to_svr(username)); new_->start(ep); return new_;
} void stop() {
if ( !started_) return; started_ = false; sock_.close();
} bool started() { return started_; } ...
private: size_t read_complete(const boost::system::error_code & err, size_t
bytes) { if ( err) return 0;
bool found = std::find(read_buffer_, read_buffer_ + bytes, ‘\n‘)
< read_buffer_ + bytes; return found ? 0 : 1;
}
private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; std::string username_; deadline_timer timer_;
};
你会看到有一个额外的方法deadline_timer timer_用来ping服务端;而且ping操作同样是随机的。
下面是类的逻辑:
void on_connect(const error_code & err) { if ( !err) do_write("login " + username_ + "\n"); else stop();
} void on_read(const error_code & err, size_t bytes) {
if ( err) stop(); if ( !started() ) return; // process the msg std::string msg(read_buffer_, bytes); if ( msg.find("login ") == 0) on_login(); else if ( msg.find("ping") == 0) on_ping(msg); else if ( msg.find("clients ") == 0) on_clients(msg);
} void on_login() {
do_ask_clients(); }
void on_ping(const std::string & msg) { std::istringstream in(msg); std::string answer; in >> answer >> answer;
if ( answer == "client_list_changed") do_ask_clients();
else postpone_ping(); }
void on_clients(const std::string & msg) { std::string clients = msg.substr(8); std::cout << username_ << ", new client list:" << clients ; postpone_ping();
}
在on_read()中,首先的两行代码可以说是加分之举。在第一行,如果有错误,我们就停止。在第二行,如果我们已经停止了(之前就停止了或者刚好停止),我们就返回。否则,如果所有都OK,我们处理进来的消息。
最后do_*方法如下:
void do_ping() { do_write("ping\n"); } void postpone_ping() {
timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000));
timer_.async_wait( MEM_FN(do_ping)); }
void do_ask_clients() { do_write("ask_clients\n"); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
} void do_write(const std::string & msg) {
if ( !started() ) return; std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some( buffer(write_buffer_, msg.size()),
MEM_FN2(on_write,_1,_2));
注意每一个read操作都会触发一个ping操作
- 当read操作结束时,on_read()被调用
- on_read()调用on_login(),on_ping()或者on_clients()
- 每一个方法要么发出一个ping,要么请求客户端列表
- 如果我们请求客户端列表,当read操作接受到它们时,他会发出一个ping操作。
时间: 2024-09-28 05:41:36