异步服务端中的多线程
我在第4章 客户端和服务端展示的异步服务端是单线程的,所有的事情都发生在main()中:
int main() { talk_to_client::ptr client = talk_to_client::new_(); acc.async_accept(client->sock(), boost::bind(handle_
accept,client,_1)); service.run();
}
异步的美妙之处就在于把单线程变为多线程的简单。你可以一直保持多线程知道你的并发客户端超过200。然后,你可以使用如下的代码片段把单线程变成100个线程:
boost::thread_group threads; void listen_thread() {
service.run(); }
void start_listen(int thread_count) { for ( int i = 0; i < thread_count; ++i)
threads.create_thread( listen_thread);
} int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_();
acc.async_accept(client->sock(), boost::bind(handle_ accept,client,_1));
start_listen(100);
threads.join_all(); }
当然,一旦你选择了多线程,你需要考虑线程安全。尽管你在线程A中调用了async_*,它的完成流程可以在线程B中被调用(因为线程B也调用了service.run())。对于它本身而言这不是问题。只要你遵循逻辑流程,也就是从async_read()到on_read(),从on_read()到process_request,从process_request到async_write(),从async_write()到on_write(),从on_write()到async_read(),然后在你的talk_to_client类中没有被调用的公有方法,尽管不同的方法可以在不同的线程中被调用,他们还是会被有序地调用。从而不需要互斥量。
这也意味着对于一个客户端,只会有一个异步操作在等待。假如在有的情况,一个客户端我们有两个异步方法在等待,你就需要互斥量了。这是因为两个等待的操作可能正好在同一个时间完成,然后我们就会在两个不同的线程中间同时调用他们的完成处理函数。所以,这里需要线程安全,也就是需要使用互斥量。
在我们的异步服务端中,我们确实同时有两个等待的操作:
void do_read() { async_read(sock_, buffer(read_buffer_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
post_check_ping();
} void post_check_ping() {
timer_.expires_from_now(boost::posix_time::millisec(5000));
timer_.async_wait( MEM_FN(on_check_ping)); }
当在做一个read操作时,我们会异步等待read操作完成和超时。所以,这里需要线程安全。
我的建议是,如果你准备使用多线程,从开始就保证你的类是线程安全的。通常这不会影响它的性能(当然你也可以在配置中设置开关)。同时,如果你准备使用多线程,从一个开始就使用。这样的话你能尽早地发现可能存在的问题。一旦你发现一个问题,你首先需要检查的事情就是:单线程运行的时候是否会发生?如果是,它很简单;只要调试它就可以了。否则,你可能忘了对一些方法加锁(互斥量)。
因为我们的例子需要是线程安全的,我已经把talk_to_client修改成使用互斥量的了。同时,我们也有一个客户端连接的列表,它也需要自己的互斥量因为我们有时需要访问它。
避免死锁和内存冲突不是很简单。下面是我需要对update_client_changed()方法进行修改的地方:
void update_clients_changed() { array copy;
{ boost::recursive_mutex::scoped_lock lk(clients_cs); copy = clients; }
for( array::iterator b = copy.begin(), e = copy.end(); b != e; ++b)
(*b)->set_clients_changed();
}
你需要避免的是同时有两个互斥量被锁定(这会导致死锁)。在我们的例子中,我们不想clients_cs和一个客户端的cs_互斥量同时被锁住
异步操作
Boost.Asio同样允许你异步地运行你任何一个方法。仅仅需要使用下面的代码片段:
void my_func() { ...
} service.post(my_func);
这样就可以保证my_func在调用了service.run()的一个线程中间被调用。你同样可以异步地调用一个有完成处理handler的方法,方法的handler会在方法结束的时候通知你。伪代码如下:
void on_complete() { ...
} void my_func() {
...
service.post(on_complete); }
async_call(my_func);
这里没有async_call方法,因此,你需要自己创建。幸运的是,它不是很复杂,参考下面的代码片段:
struct async_op : boost::enable_shared_from_this<async_op>, ... { typedef boost::function<void(boost::system::error_code)>
completion_func; typedef boost::function<boost::system::error_code ()> op_func; struct operation { ... }; void start() {
{ boost::recursive_mutex::scoped_lock lk(cs_); if ( started_) return; started_ = true; }
boost::thread t( boost::bind(&async_op::run,this)); }
void add(op_func op, completion_func completion, io_service &service) {
self_ = shared_from_this(); boost::recursive_mutex::scoped_lock lk(cs_); ops_.push_back( operation(service, op, completion)); if ( !started_) start();
}
void stop() {
boost::recursive_mutex::scoped_lock lk(cs_); started_ = false; ops_.clear();
} private:
boost::recursive_mutex cs_;
std::vector<operation> ops_; bool started_; ptr self_; };
async_op方法创建了一个后台线程,这个线程会运行(run())你添加(add())到它里面的所有的异步操作。为了让事情简单一些,每个操作都包含下面的内容:
- 一个异步调用的方法
- 当第一个方法结束时被调用的一个完成处理handler
- 会运行完成处理handler的io_service实例。这也是完成时通知你的地方。参考下面的代码:
-
struct async_op : boost::enable_shared_from_this<async_op> , private boost::noncopyable {
struct operation { operation(io_service & service, op_func op, completion_
func completion) : service(&service), op(op), completion(completion) , work(new io_service::work(service))
{} operation() : service(0) {} io_service * service; op_func op; completion_func completion; typedef boost::shared_ptr<io_service::work> work_ptr; work_ptr work;
};
... };
它们被operation结构体包含在内部。注意当有一个操作在等待时,我们在操作的构造方法中构造一个io_service::work实例,从而保证直到我们完成我们的异步调用之前service.run()都不会结束(当io_service::work实例保持活动时,service.run()就会认为它有工作需要做)。参考下面的代码片段:
struct async_op : ... { typedef boost::shared_ptr<async_op> ptr; static ptr new_() { return ptr(new async_op); } ... void run() {
while ( true) { { boost::recursive_mutex::scoped_lock lk(cs_);
if ( !started_) break; } boost::this_thread::sleep( boost::posix_
time::millisec(10)); operation cur;
));
}
{ boost::recursive_mutex::scoped_lock lk(cs_); if ( !ops_.empty()) {
cur = ops_[0]; ops_.erase( ops_.begin()); }}
if ( cur.service) cur.service->post(boost::bind(cur.completion, cur.op()
self_.reset(); }
};
run()方法就是后台线程;它仅仅观察是否有工作需要做,如果有,就一个一个地运行这些异步方法。在每个调用结束的时候,它会调用相关的完成处理方法。
为了测试,我们创建一个会被异步执行的compute_file-checksum方法
size_t checksum = 0;
boost::system::error_code compute_file_checksum(std::string file_name) {
HANDLE file = ::CreateFile(file_name.c_str(), GENERIC_READ, 0, 0,
OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
windows::random_access_handle h(service, file); long buff[1024]; checksum = 0; size_t bytes = 0, at = 0; boost::system::error_code ec;
while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) { at += bytes; bytes /= sizeof(long); for ( size_t i = 0; i < bytes; ++i)
checksum += buff[i];
}
return boost::system::error_code(0, boost::system::generic_ category());
} void on_checksum(std::string file_name, boost::system::error_code) {
std::cout << "checksum for " << file_name << "=" << checksum << std::endl;
} int main(int argc, char* argv[]) {
std::string fn = "readme.txt";
async_op::new_()->add( service, boost::bind(compute_file_ checksum,fn),
boost::bind(on_checksum,fn,_1));
service.run(); }
注意我展示给你的只是实现异步调用一个方法的一种可能。除了像我这样实现一个后台线程,你可以可是使用一个内部io_service实例,然后推送异步方法给这个实例调用。这个作为一个练习留个读者。
你也可以扩展这个类让其可以展示一个异步操作的进度(比如,使用百分比)。在这种情况下,你可以在主线程通过一个进度条来显示进度。