一 nodeos工作线程
nodeso节点的工作线程包括:一个主线程,一个信号处理线程和两个线程池。
- 主线程:main函数启动线程,该线程执行完程序初始化工作后,会调用app().io_service.run(), 启动boost::asio::io_service的异步io服务,通过异步io方式完成节点块生产,交易处理等主要业务工作。
- 信号处理线程:子线程,通过异步io服务,接收系统信号并处理。
- 线程池,线程池启动的工作线程数可通过启动参数配置。
- controller线程池: 异步执行块block_state创建,块中交易验证时的交易解签名计算。
- 生产者插件线程池:负责异步执行交易解签名计算。
? 采用默认配置(每个线程池2个工作线程),nodeos节点总线程数是6个,通过pstree命令可查看:
? nodeos有一个主线程pid=32385,该主线程有5个子线程,32386~32390。
二 主线程
? main函数执行线程:main函数最后调用app().exec(),启动io_service服务。
app()是application实例,application中定义了io_service对象io_serv:
class application
{...
std::shared_ptr<boost::asio::io_service> io_serv;
}
application::application():my(new application_impl()){
io_serv = std::make_shared<boost::asio::io_service>();
}
boost::asio::io_service& get_io_service() { return *io_serv; }
//启动io服务
void application::exec() {
io_serv->run();
shutdown();
}
void application::quit() {
my->_is_quiting = true;
io_serv->stop();
}
- 其它插件通过get_io_service()函数,获取io_serv,进行异步io投递。
- 程序退出时,会调用quit()函数,结束io服务。
? nodeos节点交易处理,出块,块验证等主要业务操作都是在该线程执行的,因为eos中交易不支持并行处理,所以application中的io_serv是不允许在除主线程之外的其它线程中重复执行io_serv.run()操作的。
? 看到application::exec()中的代码,有些人可能会有疑问:asio::io_service监听的io端口都完成(没有待监听的io端口)时,io_serv->run()就会退出。这里io_serv->run()只调用了一次,没有循环调用,不会退出么?
? nodeos投递到application::io_serv的io处理handle函数,会重复投递该io端口。所以在handle处理函数完成时,io_serv中总会有待完成io存在,io_serv->run()就不会退出,下面以producer_plugin中的_timer为例,看一下这个投递过程:
producer_plugin::producer_plugin()
: my(new producer_plugin_impl(app().get_io_service())){
my->_self = this;
};
class producer_plugin_impl {
producer_plugin_impl(boost::asio::io_service& io):_timer(io),
_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>()){}
boost::asio::deadline_timer _timer;
};
? producer_plugin()在构造时传入app().get_io_service()构造了producer_plugin_impl:: _timer; 在节点的生产循环函数中可以看到, _timer的handle函数中会调用schedule_production_loop()函数,而在该函数中,又会调用 _timer.asyncwait()重复投递 _timer到ioservice中:
三 信号处理线程
- 创建io_service对象sig_io_serv;
- 将SIGINT信号投递到io_service对象,并绑定信号处理函数,当系统发送SIGINT信号时,会触发该处理函数。
- 创建sig_thread信号处理线程,在该线程中调用sig_io_serv->run(),等待信号触发,调用相应处理函数。
- 信号处理线程只处理SIGINT,SIGTERM,SIGPIPE这三个系统信号,一旦收到信号,会调用退出操作,使nodeos退出。
四 controller线程池
4.1 定义及创建
struct controller_impl {
...
boost::asio::thread_pool thread_pool;
}
controller_impl( const controller::config& cfg, controller& s ):self(s),
chain_id( cfg.genesis.compute_chain_id() ),
read_mode( cfg.read_mode ),
...
thread_pool( cfg.thread_pool_size )
{...}
- controller线程池定义在controller_impl中;
- 根据配置参数中设定的thread_pool_size创建相应数量的工作线程。
4.2 异步任务
? 执行块相关操作时,较耗时且与排序无关的动作都会投递到controller线程池执行。eos目前投递到该线程池执行的操作有两个:
? 1 块交易验证时的解签名操作;
? 2 块状态block_state数据创建操作(两轮共识计算都在这里完成)。
4.2.1 解签名操作
? 节点收到块,会调用apply_block()函数执行块中交易,进行块验证。期间会投递交易解签名计算到controller线程池:
? 解签名投递函数create_signing_keys_futrue():
- 在线程池thread_pool中异步执行解签名函数,异步解签名的执行结果,会放入交易的signing_keys_future中。
- 真正解签名算法是trn.get_signature_keys(),解出的公钥放到recovered_pub_keys中。
4.2.2 block_state创建
五 生产者线程池
5.1 定义及创建
? 生产者线程池定义在producer_plugin_impl中,执行插件初始化函数plugin_initialize()时会根据配置参数创建工作线程。
class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin_impl> {
public:
void schedule_production_loop();//生产处理循环
//线程池定义
fc::optional<boost::asio::thread_pool> _thread_pool;//异步线程池
...
}
void producer_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
auto thread_pool_size = options.at( "producer-threads" ).as<uint16_t>();
//线程池创建
my->_thread_pool.emplace( thread_pool_size );
}
5.2 异步任务
? 生产者线程池负责的工作任务有两个:
1. 为接收到的投递交易进行异步解签名计算;
2. 等待解签名计算完成,将交易投递到主线程的异步io服务中处理。
? 这两个工作任务都在同一个函数中投递执行:
该函数在节点收到其它节点/客户端投递的交易时被调用:
- 投递异步解签名计算任务到 _threadpool线程池( 生产者线程池),计算结果放到
trx->signing_keys_future中。
- 投递异步任务到 _thread_pool线程池,该任务等待将异步解签名计算结束的交易投递到主线程的io_service中执行。
5.3 线程池关闭
? 当调用插件shutdown函数时,会执行线程池关闭动作,前程池中的工作线程退出。
六 链接
原文地址:https://blog.51cto.com/14267585/2433560