Boost.Asio c++ 网络编程翻译(24)

异步服务端中的多线程

我在第4章 客户端和服务端展示的异步服务端是单线程的,所有的事情都发生在main()中:

int main() {
       talk_to_client::ptr client = talk_to_client::new_();
       acc.async_accept(client->sock(), boost::bind(handle_
accept,client,_1));
    service.run();

}

异步的美妙之处就在于把单线程变为多线程的简单。你可以一直保持多线程知道你的并发客户端超过200。然后,你可以使用如下的代码片段把单线程变成100个线程:

boost::thread_group threads;
   void listen_thread() {
       service.run();
   }
   void start_listen(int thread_count) {
       for ( int i = 0; i < thread_count; ++i)
           threads.create_thread( listen_thread);
   }
   int main(int argc, char* argv[]) {
       talk_to_client::ptr client = talk_to_client::new_();
       acc.async_accept(client->sock(), boost::bind(handle_
   accept,client,_1));
       start_listen(100);
       threads.join_all();
   }

当然,一旦你选择了多线程,你需要考虑线程安全。尽管你在线程A中调用了async_*,它的完成流程可以在线程B中被调用(因为线程B也调用了service.run())。对于它本身而言这不是问题。只要你遵循逻辑流程,也就是从async_read()到on_read(),从on_read()到process_request,从process_request到async_write(),从async_write()到on_write(),从on_write()到async_read(),然后在你的talk_to_client类中没有被调用的公有方法,尽管不同的方法可以在不同的线程中被调用,他们还是会被有序地调用。从而不需要互斥量。

这也意味着对于一个客户端,只会有一个异步操作在等待。假如在有的情况,一个客户端我们有两个异步方法在等待,你就需要互斥量了。这是因为两个等待的操作可能正好在同一个时间完成,然后我们就会在两个不同的线程中间同时调用他们的完成处理函数。所以,这里需要线程安全,也就是需要使用互斥量。

在我们的异步服务端中,我们确实同时有两个等待的操作:

void do_read() {
       async_read(sock_, buffer(read_buffer_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
       post_check_ping();
   }
   void post_check_ping() {
       timer_.expires_from_now(boost::posix_time::millisec(5000));
       timer_.async_wait( MEM_FN(on_check_ping));
   }

当在做一个read操作时,我们会异步等待read操作完成和超时。所以,这里需要线程安全。

我的建议是,如果你准备使用多线程,从开始就保证你的类是线程安全的。通常这不会影响它的性能(当然你也可以在配置中设置开关)。同时,如果你准备使用多线程,从一个开始就使用。这样的话你能尽早地发现可能存在的问题。一旦你发现一个问题,你首先需要检查的事情就是:单线程运行的时候是否会发生?如果是,它很简单;只要调试它就可以了。否则,你可能忘了对一些方法加锁(互斥量)。

因为我们的例子需要是线程安全的,我已经把talk_to_client修改成使用互斥量的了。同时,我们也有一个客户端连接的列表,它也需要自己的互斥量因为我们有时需要访问它。

避免死锁和内存冲突不是很简单。下面是我需要对update_client_changed()方法进行修改的地方:

void update_clients_changed() {
       array copy;
       { boost::recursive_mutex::scoped_lock lk(clients_cs);
         copy = clients; }
       for( array::iterator b = copy.begin(), e = copy.end(); b != e;
   ++b)
           (*b)->set_clients_changed();

}

你需要避免的是同时有两个互斥量被锁定(这会导致死锁)。在我们的例子中,我们不想clients_cs和一个客户端的cs_互斥量同时被锁住

异步操作

Boost.Asio同样允许你异步地运行你任何一个方法。仅仅需要使用下面的代码片段:

void my_func() {
      ...
   }
   service.post(my_func);

这样就可以保证my_func在调用了service.run()的一个线程中间被调用。你同样可以异步地调用一个有完成处理handler的方法,方法的handler会在方法结束的时候通知你。伪代码如下:

void on_complete() {
      ...
   }
   void my_func() {

...

       service.post(on_complete);
   }

async_call(my_func);

这里没有async_call方法,因此,你需要自己创建。幸运的是,它不是很复杂,参考下面的代码片段:

struct async_op : boost::enable_shared_from_this<async_op>, ... {
       typedef boost::function<void(boost::system::error_code)>
   completion_func;
       typedef boost::function<boost::system::error_code ()> op_func;
       struct operation { ... };
       void start() {
           { boost::recursive_mutex::scoped_lock lk(cs_);
             if ( started_) return; started_ = true; }
           boost::thread t( boost::bind(&async_op::run,this));
       }
       void add(op_func op, completion_func completion, io_service
   &service) {
           self_ = shared_from_this();
           boost::recursive_mutex::scoped_lock lk(cs_);
           ops_.push_back( operation(service, op, completion));
           if ( !started_) start();

}

void stop() {

        boost::recursive_mutex::scoped_lock lk(cs_);
        started_ = false; ops_.clear();

} private:

    boost::recursive_mutex cs_;
    std::vector<operation> ops_; bool started_; ptr self_;
};

async_op方法创建了一个后台线程,这个线程会运行(run())你添加(add())到它里面的所有的异步操作。为了让事情简单一些,每个操作都包含下面的内容:

  • 一个异步调用的方法
  • 当第一个方法结束时被调用的一个完成处理handler
  • 会运行完成处理handler的io_service实例。这也是完成时通知你的地方。参考下面的代码:
  1. struct async_op : boost::enable_shared_from_this<async_op>
                           , private boost::noncopyable {
    
               struct operation {
                   operation(io_service & service, op_func op, completion_
    
           func completion)
                       : service(&service), op(op), completion(completion)
                       , work(new io_service::work(service))
    
                   {}
                   operation() : service(0) {}
                   io_service * service;
                   op_func op;
                   completion_func completion;
                   typedef boost::shared_ptr<io_service::work> work_ptr;
                   work_ptr work;
    

};

... };

它们被operation结构体包含在内部。注意当有一个操作在等待时,我们在操作的构造方法中构造一个io_service::work实例,从而保证直到我们完成我们的异步调用之前service.run()都不会结束(当io_service::work实例保持活动时,service.run()就会认为它有工作需要做)。参考下面的代码片段:

struct async_op : ... {
       typedef boost::shared_ptr<async_op> ptr;
       static ptr new_() { return ptr(new async_op); }
       ...
       void run() {
           while ( true) {
               { boost::recursive_mutex::scoped_lock lk(cs_);
                 if ( !started_) break; }
               boost::this_thread::sleep( boost::posix_
   time::millisec(10));
               operation cur;

));

}

{ boost::recursive_mutex::scoped_lock lk(cs_);
  if ( !ops_.empty()) {
      cur = ops_[0]; ops_.erase( ops_.begin());
  }}
if ( cur.service)
    cur.service->post(boost::bind(cur.completion, cur.op()
           self_.reset();
       }

};

run()方法就是后台线程;它仅仅观察是否有工作需要做,如果有,就一个一个地运行这些异步方法。在每个调用结束的时候,它会调用相关的完成处理方法。

为了测试,我们创建一个会被异步执行的compute_file-checksum方法

size_t checksum = 0;
   boost::system::error_code compute_file_checksum(std::string file_name)
   {
HANDLE file = ::CreateFile(file_name.c_str(), GENERIC_READ, 0, 0,
           OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
       windows::random_access_handle h(service, file);
       long buff[1024];
       checksum = 0;
       size_t bytes = 0, at = 0;
       boost::system::error_code ec;
       while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) {
           at += bytes; bytes /= sizeof(long);
           for ( size_t i = 0; i < bytes; ++i)
               checksum += buff[i];

}

       return boost::system::error_code(0, boost::system::generic_
   category());
   }
   void on_checksum(std::string file_name, boost::system::error_code) {
       std::cout << "checksum for " << file_name << "=" << checksum <<
   std::endl;
   }
   int main(int argc, char* argv[]) {
       std::string fn = "readme.txt";
       async_op::new_()->add( service, boost::bind(compute_file_
   checksum,fn),
                                       boost::bind(on_checksum,fn,_1));
       service.run();
   }

注意我展示给你的只是实现异步调用一个方法的一种可能。除了像我这样实现一个后台线程,你可以可是使用一个内部io_service实例,然后推送异步方法给这个实例调用。这个作为一个练习留个读者。

你也可以扩展这个类让其可以展示一个异步操作的进度(比如,使用百分比)。在这种情况下,你可以在主线程通过一个进度条来显示进度。

时间: 2024-11-18 07:46:43

Boost.Asio c++ 网络编程翻译(24)的相关文章

Boost.Asio c++ 网络编程翻译(1)

第一次翻译,希望大家多多指正 实战出精华 Boost.Asio C++ 网络编程 用具体的C++网络编程例子来提升你的技能 John Torjan 用具体的C++网络编程例子来提升你的技能 Copyright ? 2013 Packt Publishing 版权所有,除了在鉴定文章或者评论中进行简单引用,如果没有经过出版者事先的书面授权,该书的任何部分都不能被转载.存储在检索系统中.或者以任何形式和方式传阅. 在这本书准备发行之前,我们已经尽我们最大的努力去保证书中信息的准确性.但是,这本书中包

Boost.Asio c++ 网络编程翻译(20)

异步服务端 这个图表是相当复杂的:从Boost.Asio出来你可以看到4个箭头指向on_accept,on_read,on_write和on_check_ping.着也就意味着你永远不知道哪个异步调用是下一个完成的调用,但是你可以确定的是它是这4个操作中的一个. 现在,我们是异步的了:我们可以继续保持单线程.接受客户端连接是最简单的部分,如下所示: ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001)

Boost.Asio c++ 网络编程翻译(30)[完结]

PS:至此终于完成了Boost.Asio C++ network programming一书的翻译,这是我人生第一本完整翻译的书,从开始的磕磕绊绊,到最后小有心得,我收获很多.我将把这个系列的博客进行整理和校对,希望有兴趣的人可以帮我一起,来给大家提供更好更专业的阅读体验. 句柄追踪信息到文件 默认情况下,句柄的追踪信息被输出到标准错误流(相当于std::cerr).你想把输出重定向到其他地方的可能性是非常高的.对于控制台应用,输出和错误输出都被默认输出到相同的地方,也就是控制台.但是对于一个w

Boost.Asio c++ 网络编程翻译(14)

保持活动 假如,你需要做下面的操作: io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff)); 在这个例子中,sock和buff的存在时间都必须比read()调用的时间要长.也就是说,在调用read()返回之前,它们都必须有效.这就是你期望的:你传给一个方法的所有参数在参数内部都必须有效.当我们采用异步方式时,事情会变得越复杂. io_service servi

Boost.Asio c++ 网络编程翻译(3)

Boost.Asio入门 什么是Boost.Asio 简单来说,Boost.Asio是一个跨平台的.主要用于网络和其他一些底层输入/输出编程的C++库. 计算机网络的设计方式有很多种,但是Boost.Asio的的方式远远优于它们.它在2005年就被包含进Boost,然后被广大Bosot的用户测试并在很多项目中使用,比如Remobo(http://www.remobo.com),可以让你创建你自己的即时私有网络(IPN),libtorrent(http://www.rasterbar.com/pr

Boost.Asio c++ 网络编程翻译(11)

*_at方法 这些方法在一个流上面做随机存取操作.你来指定read和write操作从什么地方開始(offset): async_read_at(stream, offset, buffer [, completion], handler):这种方法在一个指定的流上从offset处開始运行一个异步的read操作,当操作结束时,他会调用handler. handler的格式为:void handler(const boost::system::error_code&  err, size_t byt

Boost.Asio c++ 网络编程翻译(26)

Boost.Asio-其他特性 这章我们讲了解一些Boost.Asio不那么为人所知的特性.标准的stream和streambuf对象有时候会更难用一些,但正如你所见.它们也有它们的益处.最后,你会看到姗姗来迟的Boost.Asio协程的入口,它能够让你的异步代码变的很易读.这是很惊人的一个特性. 标准stream和标准I/O buffer 读这一章节之前你须要对STL stream和STL streambuf对象有所了解. Boost.Asio在处理I/O操作时支持两种类型的buffer: b

Boost.Asio c++ 网络编程翻译(6)

io_service类 你应该已经发现大部分使用Boost.Asio编写的代码都会使用几个ios_service的实例.ios_service是这个库里面最重要的类:它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用完成处理程序. 如果你选择用同步的方式来创建你的应用,你不需要考虑我将在这一节向你展示的东西. 你可以用几种不同的方式来使用io_service.在下面的例子中,我们有3个异步操作,2个socket连接和一个计时器等待: 有一个io_service和一个处理线程

Boost.Asio c++ 网络编程翻译(7)

Boost.Asio基本原理 这一章涵盖了你使用Boost.Asio时必须知道的一些事情.我们也将深入研究比同步编程更机警.更有乐趣的异步编程. 网络API 这一部分包含了当使用Boost.Asio编写网络应用程序时你必须知道的事情. Boost.Asio命名空间 Boost.Asio的一切都需要包含在boost::asio的命名空间或者其子命名空间内. boost::asio:这是核心类和函数所在的地方.重要的类有io_service和streambuf.类似read, read_at, re