boost库asio详解——io_service作为work pool

无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。
使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。
不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。

  1 #include <stdio.h>
  2 #include <cstdlib>
  3 #include <iostream>
  4 #include <boost/thread.hpp>
  5 #include <boost/aligned_storage.hpp>
  6 #include <boost/array.hpp>
  7 #include <boost/bind.hpp>
  8 #include <boost/enable_shared_from_this.hpp>
  9 #include <boost/noncopyable.hpp>
 10 #include <boost/shared_ptr.hpp>
 11 #include <boost/asio.hpp>
 12
 13 using boost::asio::ip::tcp;
 14
 15 class handler_allocator
 16     : private boost::noncopyable
 17 {
 18 public:
 19     handler_allocator()
 20         : in_use_(false)
 21     {
 22     }
 23
 24     void* allocate(std::size_t size)
 25     {
 26         if (!in_use_ && size < storage_.size)
 27         {
 28             in_use_ = true;
 29             return storage_.address();
 30         }
 31         else
 32         {
 33             return ::operator new(size);
 34         }
 35     }
 36
 37     void deallocate(void* pointer)
 38     {
 39         if (pointer == storage_.address())
 40         {
 41             in_use_ = false;
 42         }
 43         else
 44         {
 45             ::operator delete(pointer);
 46         }
 47     }
 48
 49 private:
 50     // Storage space used for handler-based custom memory allocation.
 51     boost::aligned_storage<1024> storage_;
 52
 53     // Whether the handler-based custom allocation storage has been used.
 54     bool in_use_;
 55 };
 56
 57 template <typename Handler>
 58 class custom_alloc_handler
 59 {
 60 public:
 61     custom_alloc_handler(handler_allocator& a, Handler h)
 62         : allocator_(a),
 63         handler_(h)
 64     {
 65     }
 66
 67     template <typename Arg1>
 68     void operator()(Arg1 arg1)
 69     {
 70         handler_(arg1);
 71     }
 72
 73     template <typename Arg1, typename Arg2>
 74     void operator()(Arg1 arg1, Arg2 arg2)
 75     {
 76         handler_(arg1, arg2);
 77     }
 78
 79     friend void* asio_handler_allocate(std::size_t size,
 80         custom_alloc_handler<Handler>* this_handler)
 81     {
 82         return this_handler->allocator_.allocate(size);
 83     }
 84
 85     friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,
 86         custom_alloc_handler<Handler>* this_handler)
 87     {
 88         this_handler->allocator_.deallocate(pointer);
 89     }
 90
 91 private:
 92     handler_allocator& allocator_;
 93     Handler handler_;
 94 };
 95
 96 // Helper function to wrap a handler object to add custom allocation.
 97 template <typename Handler>
 98 inline custom_alloc_handler<Handler> make_custom_alloc_handler(
 99     handler_allocator& a, Handler h)
100 {
101     return custom_alloc_handler<Handler>(a, h);
102 }
103
104 /// A pool of io_service objects.
105 class io_service_pool
106     : private boost::noncopyable
107 {
108 public:
109     /// Construct the io_service pool.
110     explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)
111     {
112         if (pool_size == 0)
113             throw std::runtime_error("io_service_pool size is 0");
114
115         // Give all the io_services work to do so that their run() functions will not
116         // exit until they are explicitly stopped.
117         for (std::size_t i = 0; i < pool_size; ++i)
118         {
119             io_service_ptr io_service(new boost::asio::io_service);
120             work_ptr work(new boost::asio::io_service::work(*io_service));
121             io_services_.push_back(io_service);
122             work_.push_back(work);
123         }
124     }
125
126     // Run all io_service objects in the pool.
127     void run()
128     {
129         // Create a pool of threads to run all of the io_services.
130         std::vector<boost::shared_ptr<boost::thread> > threads;
131         for (std::size_t i = 0; i < io_services_.size(); ++i)
132         {
133             boost::shared_ptr<boost::thread> thread(new boost::thread(
134                 boost::bind(&boost::asio::io_service::run, io_services_[i])));
135             threads.push_back(thread);
136         }
137
138         // Wait for all threads in the pool to exit.
139         for (std::size_t i = 0; i < threads.size(); ++i)
140             threads[i]->join();
141     }
142
143     // Stop all io_service objects in the pool.
144     void stop()
145     {
146         // Explicitly stop all io_services.
147         for (std::size_t i = 0; i < io_services_.size(); ++i)
148             io_services_[i]->stop();
149     }
150
151     // Get an io_service to use.
152     boost::asio::io_service& get_io_service()
153     {
154         // Use a round-robin scheme to choose the next io_service to use.
155         boost::asio::io_service& io_service = *io_services_[next_io_service_];
156         ++next_io_service_;
157         if (next_io_service_ == io_services_.size())
158             next_io_service_ = 0;
159         return io_service;
160     }
161
162 private:
163     typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
164     typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
165
166     /// The pool of io_services.
167     std::vector<io_service_ptr> io_services_;
168
169     /// The work that keeps the io_services running.
170     std::vector<work_ptr> work_;
171
172     /// The next io_service to use for a connection.
173     std::size_t next_io_service_;
174 };
175
176 class session
177     : public boost::enable_shared_from_this<session>
178 {
179 public:
180     session(boost::asio::io_service& work_service
181         , boost::asio::io_service& io_service)
182         : socket_(io_service)
183         , io_work_service(work_service)
184     {
185     }
186
187     tcp::socket& socket()
188     {
189         return socket_;
190     }
191
192     void start()
193     {
194         socket_.async_read_some(boost::asio::buffer(data_),
195             make_custom_alloc_handler(allocator_,
196             boost::bind(&session::handle_read,
197             shared_from_this(),
198             boost::asio::placeholders::error,
199             boost::asio::placeholders::bytes_transferred)));
200     }
201
202     void handle_read(const boost::system::error_code& error,
203         size_t bytes_transferred)
204     {
205         if (!error)
206         {
207             boost::shared_ptr<std::vector<char> > buf(new std::vector<char>);
208
209             buf->resize(bytes_transferred);
210             std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());
211             io_work_service.post(boost::bind(&session::on_receive
212                 , shared_from_this(), buf, bytes_transferred));
213
214             socket_.async_read_some(boost::asio::buffer(data_),
215                 make_custom_alloc_handler(allocator_,
216                 boost::bind(&session::handle_read,
217                 shared_from_this(),
218                 boost::asio::placeholders::error,
219                 boost::asio::placeholders::bytes_transferred)));
220         }
221     }
222
223     void handle_write(const boost::system::error_code& error)
224     {
225         if (!error)
226         {
227         }
228     }
229
230     void on_receive(boost::shared_ptr<std::vector<char> > buffers
231         , size_t bytes_transferred)
232     {
233         char* data_stream = &(*buffers->begin());
234         // in here finish the work.
235         std::cout << "receive :" << bytes_transferred << " bytes." <<
236             "message :" << data_stream << std::endl;
237     }
238
239 private:
240     // The io_service used to finish the work.
241     boost::asio::io_service& io_work_service;
242
243     // The socket used to communicate with the client.
244     tcp::socket socket_;
245
246     // Buffer used to store data received from the client.
247     boost::array<char, 1024> data_;
248
249     // The allocator to use for handler-based custom memory allocation.
250     handler_allocator allocator_;
251 };
252
253 typedef boost::shared_ptr<session> session_ptr;
254
255 class server
256 {
257 public:
258     server(short port, std::size_t io_service_pool_size)
259         : io_service_pool_(io_service_pool_size)
260         , io_service_work_pool_(io_service_pool_size)
261         , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
262     {
263         session_ptr new_session(new session(io_service_work_pool_.get_io_service()
264             , io_service_pool_.get_io_service()));
265         acceptor_.async_accept(new_session->socket(),
266             boost::bind(&server::handle_accept, this, new_session,
267             boost::asio::placeholders::error));
268     }
269
270     void handle_accept(session_ptr new_session,
271         const boost::system::error_code& error)
272     {
273         if (!error)
274         {
275             new_session->start();
276             new_session.reset(new session(io_service_work_pool_.get_io_service()
277                 , io_service_pool_.get_io_service()));
278             acceptor_.async_accept(new_session->socket(),
279                 boost::bind(&server::handle_accept, this, new_session,
280                 boost::asio::placeholders::error));
281         }
282     }
283
284     void run()
285     {
286         io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
287             , &io_service_pool_)));
288         work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
289             , &io_service_work_pool_)));
290     }
291
292     void stop()
293     {
294         io_service_pool_.stop();
295         io_service_work_pool_.stop();
296
297         io_thread_->join();
298         work_thread_->join();
299     }
300
301 private:
302     boost::shared_ptr<boost::thread> io_thread_;
303     boost::shared_ptr<boost::thread> work_thread_;
304     io_service_pool io_service_pool_;
305     io_service_pool io_service_work_pool_;
306     tcp::acceptor acceptor_;
307 };
308
309 int main(int argc, char* argv[])
310 {
311     try
312     {
313         if (argc != 2)
314         {
315             std::cerr << "Usage: server <port>/n";
316             return 1;
317         }
318
319         using namespace std; // For atoi.
320         server s(atoi(argv[1]), 10);
321
322         s.run();
323
324         getchar();
325
326         s.stop();
327     }
328     catch (std::exception& e)
329     {
330         std::cerr << "Exception: " << e.what() << "/n";
331     }
332
333     return 0;
334 } 
时间: 2024-10-11 10:40:13

boost库asio详解——io_service作为work pool的相关文章

boost库asio详解1——strand与io_service区别

namespace { // strand提供串行执行, 能够保证线程安全, 同时被post或dispatch的方法, 不会被并发的执行. // io_service不能保证线程安全 boost::asio::io_service m_service; boost::asio::strand m_strand(m_service); boost::mutex m_mutex; void print(int id) { // boost::mutex::scoped_lock lock(m_mut

【Boost】boost库asio详解5——resolver与endpoint使用说明

tcp::resolver一般和tcp::resolver::query结合用,通过query这个词顾名思义就知道它是用来查询socket的相应信息,一般而言我们关心socket的东东有address,port而已,通过tcp::resolver很容易实现设置和查询,它通过query把字符串格式的ip如192.168.0.200或主机名http://localhost,端口"8080"等转化成socket内部表示格式,这样我们应用的时候可以直接使用字符串的形式,而且不用再担心socke

Python学习教程(Python学习路线):Pandas库基础分析-详解时间序列的处理

Python学习教程(Python学习路线):Pandas库基础分析-详解时间序列的处理 在使用Python进行数据分析时,经常会遇到时间日期格式处理和转换,特别是分析和挖掘与时间相关的数据,比如量化交易就是从历史数据中寻找股价的变化规律.Python中自带的处理时间的模块有datetime,NumPy库也提供了相应的方法,Pandas作为Python环境下的数据分析库,更是提供了强大的日期数据处理的功能,是处理时间序列的利器. 1.生成日期序列 主要提供pd.data_range()和pd.p

Boost::bind使用详解

1.Boost::bind 在STL中,我们经常需要使用bind1st,bind2st函数绑定器和fun_ptr,mem_fun等函数适配器,这些函数绑定器和函数适配器使用起来比较麻烦,需要根据是全局函数还是类的成员函数,是一个参数还是多个参数等做出不同的选择,而且有些情况使用STL提供的不能满足要求,所以如果可以我们最好使用boost提供的bind,它提供了统一的接口,提供了更多的支持,比如说它增加了shared_ptr,虚函数,类成员的绑定. 2.bind的工作原理 bind并不是一个单独的

Boost::split用法详解

工程中使用boost库:(设定vs2010环境)在Library files加上 D:\boost\boost_1_46_0\bin\vc10\lib在Include files加上 D:\boost\boost_1_46_0 1 // boostTest.cpp : 定义控制台应用程序的入口点. 2 #include "stdafx.h" 3 4 #include <iostream> 5 #include <boost/format.hpp> 6 #incl

[转] boost::function用法详解

http://blog.csdn.net/benny5609/article/details/2324474 要开始使用 Boost.Function, 就要包含头文件 "boost/function.hpp", 或者某个带数字的版本,从 "boost/function/function0.hpp" 到 "boost/function/function10.hpp". 如果你知道你想保存在 function 中的函数的参数数量,这样做可以让编译器

boost::function用法详解

要开始使用 Boost.Function, 就要包含头文件 "boost/function.hpp", 或者某个带数字的版本,从 "boost/function/function0.hpp" 到 "boost/function/function10.hpp". 如果你知道你想保存在 function 中的函数的参数数量,这样做可以让编译器仅包含需要的头文件.如果包含 "boost/function.hpp", 那么就会把其它的

标准IO库(详解)

文章转自:https://www.cnblogs.com/kingcat/archive/2012/05/09/2491847.html 自己在学习中,对此原文的基础之上进行补充. 什么是缓冲区 缓冲区又称为缓存,它是内存空间的一部分.也就是说,在内存空间中预留了一定的存储空间,这些存储空间用来缓冲输入或输出的数据,这部分预留的空间就叫做缓冲区. 缓冲区根据其对应的是输入设备还是输出设备,分为输入缓冲区和输出缓冲区. 为什么要引入缓冲区 比如我们从磁盘里取信息,我们先把读出的数据放在缓冲区,计算

Python-第三方库requests详解

Requests 是用Python语言编写,基于 urllib,采用 Apache2 Licensed 开源协议的 HTTP 库.它比 urllib 更加方便,可以节约我们大量的工作,完全满足 HTTP 测试需求.Requests 的哲学是以 PEP 20 的习语为中心开发的,所以它比 urllib 更加 Pythoner.更重要的一点是它支持 Python3 哦! Beautiful is better than ugly.(美丽优于丑陋) Explicit is better than im