无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。
使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。
不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。
1 #include <stdio.h> 2 #include <cstdlib> 3 #include <iostream> 4 #include <boost/thread.hpp> 5 #include <boost/aligned_storage.hpp> 6 #include <boost/array.hpp> 7 #include <boost/bind.hpp> 8 #include <boost/enable_shared_from_this.hpp> 9 #include <boost/noncopyable.hpp> 10 #include <boost/shared_ptr.hpp> 11 #include <boost/asio.hpp> 12 13 using boost::asio::ip::tcp; 14 15 class handler_allocator 16 : private boost::noncopyable 17 { 18 public: 19 handler_allocator() 20 : in_use_(false) 21 { 22 } 23 24 void* allocate(std::size_t size) 25 { 26 if (!in_use_ && size < storage_.size) 27 { 28 in_use_ = true; 29 return storage_.address(); 30 } 31 else 32 { 33 return ::operator new(size); 34 } 35 } 36 37 void deallocate(void* pointer) 38 { 39 if (pointer == storage_.address()) 40 { 41 in_use_ = false; 42 } 43 else 44 { 45 ::operator delete(pointer); 46 } 47 } 48 49 private: 50 // Storage space used for handler-based custom memory allocation. 51 boost::aligned_storage<1024> storage_; 52 53 // Whether the handler-based custom allocation storage has been used. 54 bool in_use_; 55 }; 56 57 template <typename Handler> 58 class custom_alloc_handler 59 { 60 public: 61 custom_alloc_handler(handler_allocator& a, Handler h) 62 : allocator_(a), 63 handler_(h) 64 { 65 } 66 67 template <typename Arg1> 68 void operator()(Arg1 arg1) 69 { 70 handler_(arg1); 71 } 72 73 template <typename Arg1, typename Arg2> 74 void operator()(Arg1 arg1, Arg2 arg2) 75 { 76 handler_(arg1, arg2); 77 } 78 79 friend void* asio_handler_allocate(std::size_t size, 80 custom_alloc_handler<Handler>* this_handler) 81 { 82 return this_handler->allocator_.allocate(size); 83 } 84 85 friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/, 86 custom_alloc_handler<Handler>* this_handler) 87 { 88 this_handler->allocator_.deallocate(pointer); 89 } 90 91 private: 92 handler_allocator& allocator_; 93 Handler handler_; 94 }; 95 96 // Helper function to wrap a handler object to add custom allocation. 97 template <typename Handler> 98 inline custom_alloc_handler<Handler> make_custom_alloc_handler( 99 handler_allocator& a, Handler h) 100 { 101 return custom_alloc_handler<Handler>(a, h); 102 } 103 104 /// A pool of io_service objects. 105 class io_service_pool 106 : private boost::noncopyable 107 { 108 public: 109 /// Construct the io_service pool. 110 explicit io_service_pool(std::size_t pool_size) : next_io_service_(0) 111 { 112 if (pool_size == 0) 113 throw std::runtime_error("io_service_pool size is 0"); 114 115 // Give all the io_services work to do so that their run() functions will not 116 // exit until they are explicitly stopped. 117 for (std::size_t i = 0; i < pool_size; ++i) 118 { 119 io_service_ptr io_service(new boost::asio::io_service); 120 work_ptr work(new boost::asio::io_service::work(*io_service)); 121 io_services_.push_back(io_service); 122 work_.push_back(work); 123 } 124 } 125 126 // Run all io_service objects in the pool. 127 void run() 128 { 129 // Create a pool of threads to run all of the io_services. 130 std::vector<boost::shared_ptr<boost::thread> > threads; 131 for (std::size_t i = 0; i < io_services_.size(); ++i) 132 { 133 boost::shared_ptr<boost::thread> thread(new boost::thread( 134 boost::bind(&boost::asio::io_service::run, io_services_[i]))); 135 threads.push_back(thread); 136 } 137 138 // Wait for all threads in the pool to exit. 139 for (std::size_t i = 0; i < threads.size(); ++i) 140 threads[i]->join(); 141 } 142 143 // Stop all io_service objects in the pool. 144 void stop() 145 { 146 // Explicitly stop all io_services. 147 for (std::size_t i = 0; i < io_services_.size(); ++i) 148 io_services_[i]->stop(); 149 } 150 151 // Get an io_service to use. 152 boost::asio::io_service& get_io_service() 153 { 154 // Use a round-robin scheme to choose the next io_service to use. 155 boost::asio::io_service& io_service = *io_services_[next_io_service_]; 156 ++next_io_service_; 157 if (next_io_service_ == io_services_.size()) 158 next_io_service_ = 0; 159 return io_service; 160 } 161 162 private: 163 typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr; 164 typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr; 165 166 /// The pool of io_services. 167 std::vector<io_service_ptr> io_services_; 168 169 /// The work that keeps the io_services running. 170 std::vector<work_ptr> work_; 171 172 /// The next io_service to use for a connection. 173 std::size_t next_io_service_; 174 }; 175 176 class session 177 : public boost::enable_shared_from_this<session> 178 { 179 public: 180 session(boost::asio::io_service& work_service 181 , boost::asio::io_service& io_service) 182 : socket_(io_service) 183 , io_work_service(work_service) 184 { 185 } 186 187 tcp::socket& socket() 188 { 189 return socket_; 190 } 191 192 void start() 193 { 194 socket_.async_read_some(boost::asio::buffer(data_), 195 make_custom_alloc_handler(allocator_, 196 boost::bind(&session::handle_read, 197 shared_from_this(), 198 boost::asio::placeholders::error, 199 boost::asio::placeholders::bytes_transferred))); 200 } 201 202 void handle_read(const boost::system::error_code& error, 203 size_t bytes_transferred) 204 { 205 if (!error) 206 { 207 boost::shared_ptr<std::vector<char> > buf(new std::vector<char>); 208 209 buf->resize(bytes_transferred); 210 std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin()); 211 io_work_service.post(boost::bind(&session::on_receive 212 , shared_from_this(), buf, bytes_transferred)); 213 214 socket_.async_read_some(boost::asio::buffer(data_), 215 make_custom_alloc_handler(allocator_, 216 boost::bind(&session::handle_read, 217 shared_from_this(), 218 boost::asio::placeholders::error, 219 boost::asio::placeholders::bytes_transferred))); 220 } 221 } 222 223 void handle_write(const boost::system::error_code& error) 224 { 225 if (!error) 226 { 227 } 228 } 229 230 void on_receive(boost::shared_ptr<std::vector<char> > buffers 231 , size_t bytes_transferred) 232 { 233 char* data_stream = &(*buffers->begin()); 234 // in here finish the work. 235 std::cout << "receive :" << bytes_transferred << " bytes." << 236 "message :" << data_stream << std::endl; 237 } 238 239 private: 240 // The io_service used to finish the work. 241 boost::asio::io_service& io_work_service; 242 243 // The socket used to communicate with the client. 244 tcp::socket socket_; 245 246 // Buffer used to store data received from the client. 247 boost::array<char, 1024> data_; 248 249 // The allocator to use for handler-based custom memory allocation. 250 handler_allocator allocator_; 251 }; 252 253 typedef boost::shared_ptr<session> session_ptr; 254 255 class server 256 { 257 public: 258 server(short port, std::size_t io_service_pool_size) 259 : io_service_pool_(io_service_pool_size) 260 , io_service_work_pool_(io_service_pool_size) 261 , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port)) 262 { 263 session_ptr new_session(new session(io_service_work_pool_.get_io_service() 264 , io_service_pool_.get_io_service())); 265 acceptor_.async_accept(new_session->socket(), 266 boost::bind(&server::handle_accept, this, new_session, 267 boost::asio::placeholders::error)); 268 } 269 270 void handle_accept(session_ptr new_session, 271 const boost::system::error_code& error) 272 { 273 if (!error) 274 { 275 new_session->start(); 276 new_session.reset(new session(io_service_work_pool_.get_io_service() 277 , io_service_pool_.get_io_service())); 278 acceptor_.async_accept(new_session->socket(), 279 boost::bind(&server::handle_accept, this, new_session, 280 boost::asio::placeholders::error)); 281 } 282 } 283 284 void run() 285 { 286 io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run 287 , &io_service_pool_))); 288 work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run 289 , &io_service_work_pool_))); 290 } 291 292 void stop() 293 { 294 io_service_pool_.stop(); 295 io_service_work_pool_.stop(); 296 297 io_thread_->join(); 298 work_thread_->join(); 299 } 300 301 private: 302 boost::shared_ptr<boost::thread> io_thread_; 303 boost::shared_ptr<boost::thread> work_thread_; 304 io_service_pool io_service_pool_; 305 io_service_pool io_service_work_pool_; 306 tcp::acceptor acceptor_; 307 }; 308 309 int main(int argc, char* argv[]) 310 { 311 try 312 { 313 if (argc != 2) 314 { 315 std::cerr << "Usage: server <port>/n"; 316 return 1; 317 } 318 319 using namespace std; // For atoi. 320 server s(atoi(argv[1]), 10); 321 322 s.run(); 323 324 getchar(); 325 326 s.stop(); 327 } 328 catch (std::exception& e) 329 { 330 std::cerr << "Exception: " << e.what() << "/n"; 331 } 332 333 return 0; 334 }
时间: 2024-10-11 10:40:13