boost asio one client one thread

  总结了一个简单的boost asio的tcp服务器端与客户端通信流程.模型是一个client对应一个线程。先做一个记录,后续再对此进行优化。

  环境:VS2017  + Boost 1.67

  server:

  1 #include <stdio.h>
  2 #include <cstdlib>
  3 #include <iostream>
  4 #include <boost/thread.hpp>
  5 #include <boost/aligned_storage.hpp>
  6 #include <boost/array.hpp>
  7 #include <boost/bind.hpp>
  8 #include <boost/enable_shared_from_this.hpp>
  9 #include <boost/noncopyable.hpp>
 10 #include <boost/shared_ptr.hpp>
 11 #include <boost/asio.hpp>
 12
 13 using boost::asio::ip::tcp;
 14
 15 class handler_allocator
 16     : private boost::noncopyable
 17 {
 18 public:
 19     handler_allocator()
 20         : in_use_(false)
 21     {
 22     }
 23
 24     void* allocate(std::size_t size)
 25     {
 26         if (!in_use_ && size < storage_.size)
 27         {
 28             in_use_ = true;
 29             return storage_.address();
 30         }
 31         else
 32         {
 33             return ::operator new(size);
 34         }
 35     }
 36
 37     void deallocate(void* pointer)
 38     {
 39         if (pointer == storage_.address())
 40         {
 41             in_use_ = false;
 42         }
 43         else
 44         {
 45             ::operator delete(pointer);
 46         }
 47     }
 48
 49 private:
 50     // Storage space used for handler-based custom memory allocation.
 51     boost::aligned_storage<1024> storage_;
 52
 53     // Whether the handler-based custom allocation storage has been used.
 54     bool in_use_;
 55 };
 56
 57 template <typename Handler>
 58 class custom_alloc_handler
 59 {
 60 public:
 61     custom_alloc_handler(handler_allocator& a, Handler h)
 62         : allocator_(a),
 63         handler_(h)
 64     {
 65     }
 66
 67     template <typename Arg1>
 68     void operator()(Arg1 arg1)
 69     {
 70         handler_(arg1);
 71     }
 72
 73     template <typename Arg1, typename Arg2>
 74     void operator()(Arg1 arg1, Arg2 arg2)
 75     {
 76         handler_(arg1, arg2);
 77     }
 78
 79     friend void* asio_handler_allocate(std::size_t size,
 80         custom_alloc_handler<Handler>* this_handler)
 81     {
 82         return this_handler->allocator_.allocate(size);
 83     }
 84
 85     friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,
 86         custom_alloc_handler<Handler>* this_handler)
 87     {
 88         this_handler->allocator_.deallocate(pointer);
 89     }
 90
 91 private:
 92     handler_allocator & allocator_;
 93     Handler handler_;
 94 };
 95
 96 // Helper function to wrap a handler object to add custom allocation.
 97 template <typename Handler>
 98 inline custom_alloc_handler<Handler> make_custom_alloc_handler(
 99     handler_allocator& a, Handler h)
100 {
101     return custom_alloc_handler<Handler>(a, h);
102 }
103
104 /// A pool of io_service objects.
105 class io_service_pool
106     : private boost::noncopyable
107 {
108 public:
109     /// Construct the io_service pool.
110     explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)
111     {
112         if (pool_size == 0)
113             throw std::runtime_error("io_service_pool size is 0");
114
115         // Give all the io_services work to do so that their run() functions will not
116         // exit until they are explicitly stopped.
117         for (std::size_t i = 0; i < pool_size; ++i)
118         {
119             io_service_ptr io_service(new boost::asio::io_service);
120             work_ptr work(new boost::asio::io_service::work(*io_service));
121             io_services_.push_back(io_service);
122             work_.push_back(work);
123         }
124     }
125
126     // Run all io_service objects in the pool.
127     void run()
128     {
129         // Create a pool of threads to run all of the io_services.
130         std::vector<boost::shared_ptr<boost::thread> > threads;
131         for (std::size_t i = 0; i < io_services_.size(); ++i)
132         {
133             boost::shared_ptr<boost::thread> thread(new boost::thread(
134                 boost::bind(&boost::asio::io_service::run, io_services_[i])));
135             threads.push_back(thread);
136         }
137
138         // Wait for all threads in the pool to exit.
139         for (std::size_t i = 0; i < threads.size(); ++i)
140             threads[i]->join();
141     }
142
143     // Stop all io_service objects in the pool.
144     void stop()
145     {
146         // Explicitly stop all io_services.
147         for (std::size_t i = 0; i < io_services_.size(); ++i)
148             io_services_[i]->stop();
149     }
150
151     // Get an io_service to use.
152     boost::asio::io_service& get_io_service()
153     {
154         // Use a round-robin scheme to choose the next io_service to use.
155         boost::asio::io_service& io_service = *io_services_[next_io_service_];
156         ++next_io_service_;
157         if (next_io_service_ == io_services_.size())
158             next_io_service_ = 0;
159         return io_service;
160     }
161
162 private:
163     typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
164     typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
165
166     /// The pool of io_services.
167     std::vector<io_service_ptr> io_services_;
168
169     /// The work that keeps the io_services running.
170     std::vector<work_ptr> work_;
171
172     /// The next io_service to use for a connection.
173     std::size_t next_io_service_;
174 };
175
176 class session
177     : public boost::enable_shared_from_this<session>
178 {
179 public:
180     session(boost::asio::io_service& work_service
181         , boost::asio::io_service& io_service)
182         : socket_(io_service)
183         , io_work_service(work_service)
184     {
185     }
186
187     tcp::socket& socket()
188     {
189         return socket_;
190     }
191
192     void start()
193     {
194
195         boost::system::error_code error;
196         handle_write(error);
197
198         socket_.async_read_some(boost::asio::buffer(data_),
199             make_custom_alloc_handler(allocator_,
200                 boost::bind(&session::handle_read,
201                     shared_from_this(),
202                     boost::asio::placeholders::error,
203                     boost::asio::placeholders::bytes_transferred)));
204     }
205
206     void handle_read(const boost::system::error_code& error,
207         size_t bytes_transferred)
208     {
209         if (!error)
210         {
211             boost::shared_ptr<std::vector<char> > buf(new std::vector<char>);
212
213             buf->resize(bytes_transferred);
214             std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());
215             io_work_service.post(boost::bind(&session::on_receive
216                 , shared_from_this(), buf, bytes_transferred));
217
218             socket_.async_read_some(boost::asio::buffer(data_),
219                 make_custom_alloc_handler(allocator_,
220                     boost::bind(&session::handle_read,
221                         shared_from_this(),
222                         boost::asio::placeholders::error,
223                         boost::asio::placeholders::bytes_transferred)));
224         }
225     }
226
227     void handle_write(const boost::system::error_code& error)
228     {
229         if (!error)
230         {
231             char notice[] = "Welcome to Connect to Hiper Service";
232             size_t num;
233             try
234             {
235                 num = socket_.send(boost::asio::buffer(notice));
236             }
237             catch (std::exception &e)
238             {
239                 std::cout << "exception: " << e.what() << std::endl;
240             }
241             if (num>0)
242             {
243                 std::cout << "send : " << notice << std::endl;
244             }
245         }
246     }
247
248     void on_receive(boost::shared_ptr<std::vector<char> > buffers
249         , size_t bytes_transferred)
250     {
251         char* data_stream = &(*buffers->begin());
252         // in here finish the work.
253         std::cout << "receive :" << bytes_transferred << " bytes." <<
254             "message :" << data_stream << std::endl;
255
256         boost::system::error_code error;
257         handle_write(error);
258     }
259
260 private:
261     // The io_service used to finish the work.
262     boost::asio::io_service& io_work_service;
263
264     // The socket used to communicate with the client.
265     tcp::socket socket_;
266
267     // Buffer used to store data received from the client.
268     boost::array<char, 1024> data_;
269
270     // The allocator to use for handler-based custom memory allocation.
271     handler_allocator allocator_;
272 };
273
274 typedef boost::shared_ptr<session> session_ptr;
275
276 class server
277 {
278 public:
279     server(short port, std::size_t io_service_pool_size)
280         : io_service_pool_(io_service_pool_size)
281         , io_service_work_pool_(io_service_pool_size)
282         , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
283     {
284         session_ptr new_session(new session(io_service_work_pool_.get_io_service()
285             , io_service_pool_.get_io_service()));
286         acceptor_.async_accept(new_session->socket(),
287             boost::bind(&server::handle_accept, this, new_session,
288                 boost::asio::placeholders::error));
289     }
290
291     void handle_accept(session_ptr new_session,
292         const boost::system::error_code& error)
293     {
294         if (!error)
295         {
296             new_session->start();
297             new_session.reset(new session(io_service_work_pool_.get_io_service()
298                 , io_service_pool_.get_io_service()));
299             acceptor_.async_accept(new_session->socket(),
300                 boost::bind(&server::handle_accept, this, new_session,
301                     boost::asio::placeholders::error));
302         }
303     }
304
305     void run()
306     {
307         io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
308             , &io_service_pool_)));
309         work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
310             , &io_service_work_pool_)));
311     }
312
313     void stop()
314     {
315         io_service_pool_.stop();
316         io_service_work_pool_.stop();
317
318         io_thread_->join();
319         work_thread_->join();
320     }
321
322 private:
323     boost::shared_ptr<boost::thread> io_thread_;
324     boost::shared_ptr<boost::thread> work_thread_;
325     io_service_pool io_service_pool_;
326     io_service_pool io_service_work_pool_;
327     tcp::acceptor acceptor_;
328 };
329
330
331 int main() {
332     server svr(8000, 10);
333     svr.run();
334
335     while (true)
336     {
337         boost::this_thread::sleep(boost::posix_time::seconds(5));
338     }
339
340
341     return 0;
342 }

client:

  1 #include <iostream>
  2 #include <boost/shared_ptr.hpp>
  3 #include <boost/asio.hpp>
  4 #include <boost/asio/placeholders.hpp>
  5 #include <boost/system/error_code.hpp>
  6 #include <boost/bind/bind.hpp>
  7 #include<boost/thread.hpp>
  8
  9 using namespace boost::asio;
 10 using namespace std;
 11
 12 class client
 13 {
 14     typedef client this_type;
 15     typedef ip::tcp::acceptor acceptor_type;
 16     typedef ip::tcp::endpoint endpoint_type;
 17     typedef ip::tcp::socket socket_type;
 18     typedef ip::address address_type;
 19     typedef boost::shared_ptr<socket_type> sock_ptr;
 20     typedef vector<char> buffer_type;
 21
 22 private:
 23     io_service m_io;
 24     buffer_type m_buf;
 25     endpoint_type m_ep;
 26 public:
 27     client() : m_buf(1024, 0), m_ep(address_type::from_string("127.0.0.1"), 8000)
 28     {
 29         start();
 30     }
 31
 32     void run()
 33     {
 34         m_io.run();
 35     }
 36
 37     void start()
 38     {
 39         sock_ptr sock(new socket_type(m_io));
 40         sock->async_connect(m_ep, boost::bind(&this_type::conn_handler, this, boost::asio::placeholders::error, sock));
 41     }
 42
 43     void conn_handler(const boost::system::error_code&ec, sock_ptr sock)
 44     {
 45         if (ec)
 46         {
 47             std::cout << ec.message() << std::endl;
 48             return;
 49         }
 50         cout << "Receive from " << sock->remote_endpoint().address() << ": ";
 51         cout << "Port:" << sock->remote_endpoint().port() << endl;
 52         sock->async_read_some(buffer(m_buf), boost::bind(&client::read_handler, this, boost::asio::placeholders::error, sock));
 53     }
 54
 55     void read_handler(const boost::system::error_code&ec, sock_ptr sock)
 56     {
 57         if (ec)
 58         {
 59             return;
 60         }
 61         sock->async_read_some(buffer(m_buf), boost::bind(&client::read_handler, this, boost::asio::placeholders::error, sock));
 62         cout << &m_buf[0] << endl;
 63
 64
 65         boost::this_thread::sleep(boost::posix_time::milliseconds(200));
 66
 67         char pmsg[] = "client 6666";
 68
 69         sock->async_write_some(buffer(pmsg), boost::bind(&this_type::write_handler, this, boost::asio::placeholders::error));
 70     }
 71
 72     void write_handler(const boost::system::error_code& ec)
 73     {
 74         if (ec)
 75         {
 76             std::cout << ec.message() << std::endl;
 77             return;
 78         }
 79
 80         std::cout << "send success!" << std::endl;
 81
 82     }
 83 };
 84
 85 int main()
 86 {
 87     try
 88     {
 89         cout << "Client start." << endl;
 90         client cl;
 91         cl.run();
 92     }
 93     catch (std::exception &e)
 94     {
 95         cout << e.what() << endl;
 96     }
 97
 98     while (true)
 99     {
100         boost::this_thread::sleep(boost::posix_time::seconds(5));
101     }
102
103     return 0;
104 }

原文地址:https://www.cnblogs.com/hbright/p/9432254.html

时间: 2024-11-08 23:59:07

boost asio one client one thread的相关文章

Boost.Asio基础(五) 异步编程初探

异步编程 本节深入讨论异步编程将遇到的若干问题.建议多次阅读,以便吃透这一节的内容,这一节是对整个boost.asio来说是非常重要的. 为什么需要异步 如前所述,通常同步编程要比异步编程更简单.同步编程下,我们很容易线性地对问题进行考量,函数A调用完,继续执行B,B执行完,继续执行C,以此类推,相对比较直观.而对于异步编程,假设有5个事件,我们很难知道它们具体的执行顺序,你甚至不知道,它到底会不会被执行. 虽然编写异步的程序,很难,但是依然需要使用这种方法.因为服务器程序需要同时并行的处理大量

Boost.Asio c++ 网络编程翻译(11)

*_at方法 这些方法在一个流上面做随机存取操作.你来指定read和write操作从什么地方開始(offset): async_read_at(stream, offset, buffer [, completion], handler):这种方法在一个指定的流上从offset处開始运行一个异步的read操作,当操作结束时,他会调用handler. handler的格式为:void handler(const boost::system::error_code&  err, size_t byt

Boost.Asio基本原理(CSDN也有Markdown了,好开森)

Boost.Asio基本原理 这一章涵盖了在使用Boost.Asio时必须知道的一些事情.我们也将深入研究比同步编程更复杂.更有乐趣的异步编程. 网络API 这一部分包含了当使用Boost.Asio编写网络应用程序时必须知道的事情. Boost.Asio命名空间 Boost.Asio的所有内容都包含在boost::asio命名空间或者其子命名空间内. * boost::asio:这是核心类和函数所在的地方.重要的类有io_service和streambuf.类似read, read_at, re

boost::asio的http client应用笔记

1 踩过的坑 1.1 io_service boost::asio::io_service::run()会一直运行到没有任务为止,如果中途调用stop(),则所有等待中的任务会立刻执行.解决方案是用run_one(),即 while (keep_running) io_service_.run_one(); keep_running是个bool值,要stop io_service的时候直接置false即可. 1.2 deadline_timer 在调用async_wait()后,无论调用dead

10 C++ Boost ASIO网路通信库 TCP/UDP,HTTP

  tcp 同步服务器,显示服务器端时间 tcp 同步服务器,提供多种选择 多线程的tcp 同步服务器 tcp 同步客户端 boost 域名地址解析 tcp异步服务器 tcp 异步客户端 UDP同步服务器 UDP同步客户端 UDP异步服务器 UDP异步客户端 HTTP同步客户端 HTTP异步客户端 同步实验: 异步实验 多线程异步实验 tcp 同步服务器,显示服务器端时间 [email protected]:~/boost$ cat main.cpp  #include <ctime> #in

boost.asio包装类st_asio_wrapper开发教程(一)

一:什么是st_asio_wrapper它是一个c/s网络编程框架,基于对boost.asio的包装(最低在boost-1.49.0上调试过),目的是快速的构建一个c/s系统: 二:st_asio_wrapper的特点效率高.跨平台.完全异步,当然这是从boost.asio继承而来:自动重连,数据透明传输,自动解决分包粘包问题(必须使用默认的打包解包器,这一特性表现得与udp一样):只支持tcp和udp协议: 三:st_asio_wrapper的大体结构st_asio_wrapper.h:编译器

boost::asio 连接管理11 如何关闭连接

在实际产品运行中,对连接管理有了更新的认识,这里分享一下. shared_ptr管理连接对象的生命周期 shared_ptr的引用计数器决定了连接对象的生命周期.这里我说的连接对象就是在我的前文:http://blog.csdn.net/csfreebird/article/details/8522620 中的Client对象: [cpp] view plaincopyprint? #include "core/connection.h" #include <vector>

boost.asio包装类st_asio_wrapper开发教程(2014.5.23更新)(一)-----转

一:什么是st_asio_wrapper它是一个c/s网络编程框架,基于对boost.asio的包装(最低在boost-1.49.0上调试过),目的是快速的构建一个c/s系统:二:st_asio_wrapper的特点效率高.跨平台.完全异步,当然这是从boost.asio继承而来:自动重连,数据透明传输,自动解决分包粘包问题(必须使用默认的打包解包器,这一特性表现得与udp一样):只支持tcp和udp协议:三:st_asio_wrapper的大体结构st_asio_wrapper.h:编译器版本

stream opencv mat (webcam)frame throught tcp by boost asio

客户端: ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84