Boost.ASIO简要分析-4 多线程

4. 多线程

一般情况下,服务端开启一条线程做io_service::run()工作就足够了。但是,有些情况下可能会变得很糟糕。

从之前的分析,我们知道异步操作的一个关键步骤就是io_service回调我们注册的handler。现在假设客户端与服务端建立了四个socket连接,相应的I/O对象分别为socket1, socket2, socket3, socket4,并且假设io_service现在正在处理socket1注册的handler。如果这个handler处理的过程很长,那么在这期间socket2,socket3,socket4注册的handler会一直得不到执行,造成不良的使用体验。

针对这个问题,解决之道只有采用多线程的方法。多线程的用法很简单,我们只要把线程函数boost::asio::io_service::run和io_service指针绑定好传给boost::thread类就好了。如下所示:

boost::thread t(boost::bind(&boost::asio::io_service::run, &io));

t.join();

但是,引入多线程又会引入多线程同步的问题,如果这个问题没解决好,死机就是家常便饭了。幸好,asio给我们提供了strand这个类(当然,也可以使用mutex,但是使用strand会使代码更加优雅)。下面,简单介绍下strand这个类。

1) 用法

用法很简单,首先定义下变量。

boost::asio::io_service::strand strand_(&io); //注意io_service对象地址作为他的参数。

然后在注册回调函数时,在外面套上一层strand_.wrap()就好了,如下所示:

timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
这样的话,这两个异步操作的回调函数肯定是被顺序执行的。

2) 源码分析

在分析源码之前,我们看下一个完整的调用堆栈:

我们把不采用strand时的调用堆栈图拿来比对下

不知道有没有被吓一跳,采用strand方式竟然会多出这么多层调用,让回调的路途看上去如此漫长。

好了,废话不多说,我们strand的那张调用堆栈图中寻找strand的蛛丝马迹。回调的路上这个函数boost::asio::io_service::strand::dispatch,顿时眼前一亮,让我想起strand类中的dispatch函数。眼尖的朋友可能发现调用堆栈上出现了两次boost::asio::io_service::strand::dispatch,不要奇怪,这两次的handler是不一样的,如下图。

这是先被调用的

这是后被调用的

下面贴出io_service::strand类:

class io_service::strand
{
public:
  explicit strand(boost::asio::io_service& io_service)
    : service_(boost::asio::use_service<
        boost::asio::detail::strand_service>(io_service))
  {
    service_.construct(impl_);
  }

  ~strand()
  {
  }

  boost::asio::io_service& get_io_service()
  {
    return service_.get_io_service();
  }

/* 这就是第一个出现在回调路上的函数。
   这个函数的作用是让strand执行给定的handler。
   还有一点要说的就是,如果当前线程调用了service::run,那么该线程可以直接调用handler。这也是和post的区别之一。我们可以假想下如果回调的路上不是strand::dispatch,而是strand::post,那么我们的线程栈上也不一定会这么长了。*/
  template <typename CompletionHandler>
  BOOST_ASIO_INITFN_RESULT_TYPE(CompletionHandler, void ())
  dispatch(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler)
  {
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a CompletionHandler.
    BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;

    detail::async_result_init<
      CompletionHandler, void ()> init(
        BOOST_ASIO_MOVE_CAST(CompletionHandler)(handler));

// 注意此处是service_是boost::asio::detail::strand_service类型的哦;
// strand_service里面才是真正控制多线程安全的地方。
    service_.dispatch(impl_, init.handler);

    return init.result.get();
  }

// 和dispatch都有投递任务的作用。只是post会马上返回,handler会被某个调用service::run的线程执行。
  template <typename CompletionHandler>
  BOOST_ASIO_INITFN_RESULT_TYPE(CompletionHandler, void ())
  post(BOOST_ASIO_MOVE_ARG(CompletionHandler) handler)
  {
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a CompletionHandler.
    BOOST_ASIO_COMPLETION_HANDLER_CHECK(CompletionHandler, handler) type_check;

    detail::async_result_init<
      CompletionHandler, void ()> init(
        BOOST_ASIO_MOVE_CAST(CompletionHandler)(handler));

    service_.post(impl_, init.handler);

    return init.result.get();
  }

// 这个wrap函数就是上面说起的那个打包函数。有了它,io_service会先调用strand然后再调用handler,相当于在回调的路上设置了一道关卡,通过strand保证线程安全性。
  template <typename Handler>
#if defined(GENERATING_DOCUMENTATION)
  unspecified
#else
  detail::wrapped_handler<strand, Handler, detail::is_continuation_if_running>
#endif
  wrap(Handler handler)
  {
    return detail::wrapped_handler<io_service::strand, Handler,
        detail::is_continuation_if_running>(*this, handler);
  }

  bool running_in_this_thread() const
  {
    return service_.running_in_this_thread(impl_);
  }

private:
  boost::asio::detail::strand_service& service_;
  boost::asio::detail::strand_service::implementation_type impl_;
};

光这个类是看不出具体实现细节的,相要了解更多实现细节需要分析strand_service这个类。

具体的多线程控制方面,我们可以看下strand_service::strand_impl这个嵌套类

// The underlying implementation of a strand.

class strand_impl

    : public operation

  {

public:

strand_impl();

private:

// Only this service will have access to the internal values.

friend class strand_service;

friend struct on_do_complete_exit;

friend struct on_dispatch_exit;

// 这就是那个用来多线程控制的互斥锁

// Mutex to protect access to internal data.

boost::asio::detail::mutex mutex_;

// 用来表示strand是否被某个handler“锁住”的变量

// Indicates whether the strand is currently "locked" by a handler. This

// means that there is a handler upcall in progress, or that the strand

// itself has been scheduled in order to invoke some pending handlers.

bool locked_;

// 哦,等待处理的排队队列

// The handlers that are waiting on the strand but should not be run until

// after the next time the strand is scheduled. This queue must only be

// modified while the mutex is locked.

op_queue<operation> waiting_queue_;

// 已经获取锁并准备运行的handler

// The handlers that are ready to be run. Logically speaking, these are the

// handlers that hold the strand‘s lock. The ready queue is only modified

// from within the strand and so may be accessed without locking the mutex.

op_queue<operation> ready_queue_;

  };

可以看出mutex_、locked_、waiting_queue_、ready_queue_这四个变量保证了线程安全性。具体实现方法,可以自己去调试下,这里就不细细分析了(其实是肚子饿了,要去吃饭了^^)。

PS: 异步调用+多线程肯定会大大增加你的调试复杂度,加上日志记录是势在必行的事情。这里向大家推荐下简单易用的glog:https://github.com/google/glog

时间: 2024-07-29 03:51:11

Boost.ASIO简要分析-4 多线程的相关文章

Boost.ASIO简要分析-1 初窥

Boost.Asio是一个主要用于网络及底层I/O编程的跨平台C++库. 1. 初窥 Boost.Asio支持对I/O对象进行同步及异步操作. 1.1 同步操作 同步操作的事件顺序如下图所示: 1) 调用者调用I/O对象的connect函数开始连接操作,socket.connect(server_endpoint): 2) I/O对象将连接请求传递给io_service: 3) io_service调用操作系统函数: 4) 操作系统返回结果给io_service: 5) io_service将结

Boost.ASIO简要分析-5 多io_service

5. 多io_service 前面那篇讲到了多线程的用法.这篇讲一下多io_service的用法,大家可参考下官方提供的HTTP Server 2(an io_service-per-CPU)这个例子. 官方提供的例子中,使用方法很简单,建立一个io_service_pool,然后对每一个io_service开一个线程去让它跑起来(毕竟,io_service::run这个函数在有任务的时候会一直工作的,只有开多个线程才做到不影响别的io_service).当然,我们也可以为每个io_servic

&lt;转&gt;浅谈 Boost.Asio 的多线程模型

本文转自:http://senlinzhan.github.io/2017/09/17/boost-asio/ Boost.Asio 有两种支持多线程的方式,第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_service,并且每个线程都调用各自的io_service的run()方法. 另一种支持多线程的方式:全局只分配一个io_service,并且让这个io_service在多个线程之间共享,每个线程都调用全局的io_service的run()方法. 每个线程一个 I/O Serv

Boost::asio io_service 实现分析

io_service的作用 io_servie 实现了一个任务队列,这里的任务就是void(void)的函数.Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用.Io_service是完全线程安全的队列. Io_servie的接口 提供的接口有run.run_one.poll.poll_one.stop.reset.dispatch.post,最常用的是run.post.stop Io_se

网络库crash以及boost asio strand dispath分析

最近在做服务器的稳定性的相关测试,服务器的网络底层使用的是boost asio,然后自己做的二次封装以更好的满足需求. 服务器昨天晚上发现crash了一次,之前测试了将近半个多月,有一次是莫名的退出了,不过由于是新的测试服,忘记将ulimit -c进行修改了,所以没有coredump,这次又发生了. coredump如下: #0 0x0000000000000091 in ?? () #1 0x0000000000459729 in ClientHandler::HandleConnect(cp

boost.asio源码剖析(三) ---- 流程分析

* 常见流程分析之一(Tcp异步连接) 我们用一个简单的demo分析Tcp异步连接的流程: 1 #include <iostream> 2 #include <boost/asio.hpp> 3 4 // 异步连接回调函数 5 void on_connect(boost::system::error_code ec) 6 { 7 if (ec) // 连接失败, 输出错误码 8 std::cout << "async connect error:"

如何在多线程leader-follower模式下正确的使用boost::asio。

#include <assert.h> #include <signal.h> #include <unistd.h> #include <iostream> #include <string> #include <deque> #include <set> #include "boost/asio.hpp" #include "boost/thread.hpp" #include

Boost Asio初探

一.简介 Boost Asio ( asynchronous input and output)关注数据的异步输入输出.Boost Asio 库提供了平台无关性的异步数据处理能力(当然它也支持同步数据处理).一般的数据传输过程需要通过函数的返回值来判断数据传输是否成功,而Boost Asio将数据传输分为两个独立的步骤: 采用异步任务的方式开始数据传输. 将传输结果通知调用端 与传统方式相比,它的优势在于程序在数据传输期间不会被阻塞. 二.I/O Services 与 I/O Objects 应

boost::asio::detail::epoll_reactor::start_op的崩溃问题

在对程序进行压力测试时发现,程序有概率会在boost::asio::detail::epoll_reactor::start_op上面奔溃 尤其是在并发数较高的情况下. 查看boost中epoll_reactor.ipp中的源码,对奔溃处的逻辑进行分析后发现其基本逻辑如下: 对于每个socket链接,在程序调用停止函数对socket对象进行关闭或者销毁时 会在reactive_socket_service_base::destroy/close调用epoll_reactor::deregiste