Ocata Neutron代码分析(二)——Neutron RPC启动过程分析

RPC启动跟Neutron API的启动在同一个函数中执行,neutron.server.wsgi_eventlet.py中的eventlet_wsgi_server。

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)

def start_api_and_rpc_workers(neutron_api):
    try:
        worker_launcher = service.start_all_workers()

        pool = eventlet.GreenPool()
        api_thread = pool.spawn(neutron_api.wait)
        plugin_workers_thread = pool.spawn(worker_launcher.wait)

        # api and other workers should die together. When one dies,
        # kill the other.
        api_thread.link(lambda gt: plugin_workers_thread.kill())
        plugin_workers_thread.link(lambda gt: api_thread.kill())

        pool.waitall()
    except NotImplementedError:
        LOG.info(_LI("RPC was already started in parent process by "
                     "plugin."))

        neutron_api.wait()

start_api_and_rpc_workers函数中使用start_all_workers函数来启动RPC相关的workers,返回worker_launcher对象(实际上是oslo_service.service::ProcessLauncher的实例)。接着,主进程使用GreenPool分别spawn出neutron_api和worker_launcher的wait函数,并调用waitall函数来等待这两个GreenThread结束。其中的两个link函数是为了保证其中一个服务挂掉,另外一个服务也要随之结束。

下面重点分析neutron.service的start_all_workers函数:

def start_all_workers():
    workers = _get_rpc_workers() + _get_plugins_workers()
    return _start_workers(workers)

start_all_workers主要完成获取workers并启动的工作,这里的workers分为两类:

  • _get_rpc_workers返回每个plugin(包括core plugin和service plugin)所必须的rpc worker;
  • _get_plugins_workers返回每一种(跟第一点中的每个plugin不同)plugin中用于实现自己特殊需求(如果有)的rpc workers。

先分析_get_rpc_workers:

def _get_rpc_workers():
    plugin = directory.get_plugin()                         # 获取core plugin
    service_plugins = directory.get_plugins().values()      # 获取plugin,包括core plugin和service plugin

    if cfg.CONF.rpc_workers < 1:                            # 检查配置文件中的rpc_workers的值,必须大于或等于1
        cfg.CONF.set_override(‘rpc_workers‘, 1)

    # If 0 < rpc_workers then start_rpc_listeners would be called in a
    # subprocess and we cannot simply catch the NotImplementedError.  It is
    # simpler to check this up front by testing whether the plugin supports
    # multiple RPC workers.
    if not plugin.rpc_workers_supported():
        LOG.debug("Active plugin doesn‘t implement start_rpc_listeners")
        if 0 < cfg.CONF.rpc_workers:
            LOG.error(_LE("‘rpc_workers = %d‘ ignored because "
                          "start_rpc_listeners is not implemented."),
                      cfg.CONF.rpc_workers)
        raise NotImplementedError()

    # passing service plugins only, because core plugin is among them
    rpc_workers = [RpcWorker(service_plugins,
                             worker_process_count=cfg.CONF.rpc_workers)]

    if (cfg.CONF.rpc_state_report_workers > 0 and
            plugin.rpc_state_report_workers_supported()):
        rpc_workers.append(
            RpcReportsWorker(
                [plugin],
                worker_process_count=cfg.CONF.rpc_state_report_workers
            )
        )
    return rpc_workers

plugin.rpc_workers_supported实际上是检查core plugin是否实现了start_rpc_listeners方法。

接着,实例化了RpcWorker类,这是这个函数中最关键的步骤。RpcWorker类和Neutron API部分的WorkerService类一样,是继承于neutron_worker.NeutronWorker类的子类,同样实现了start函数。

最后,判断配置文件中的rpc_state_report_workers和core plugin是否支持对rpc state的report。如果支持,则再启动rpc_state_report_workers这么多个子进程来运行 RpcReportsWorker。这个Worker是RpcWorker的子类,只是其start_listeners_method跟RpcWorker不同。

class RpcReportsWorker(RpcWorker):
    start_listeners_method = ‘start_rpc_state_reports_listener‘

下面分析RpcWorker的构造函数和其start函数:

class RpcWorker(neutron_worker.NeutronWorker):
    """Wraps a worker to be handled by ProcessLauncher"""
    start_listeners_method = ‘start_rpc_listeners‘

    def __init__(self, plugins, worker_process_count=1):                         # 构造函数只是进行了变量的简单赋值
        super(RpcWorker, self).__init__(
            worker_process_count=worker_process_count
        )

        self._plugins = plugins
        self._servers = []

    def start(self):
        super(RpcWorker, self).start()
        for plugin in self._plugins:
            if hasattr(plugin, self.start_listeners_method):
                try:
                    servers = getattr(plugin, self.start_listeners_method)()
                except NotImplementedError:
                    continue
                self._servers.extend(servers)

start函数会遍历所有的plugins(包括core plugin和service plugins),查看各个plugin是否实现了start_listeners_method(即start_rpc_listeners)方法,如果实现了则调用之,如果没有就跳过。

这就是RpcWorker的作用。各个plugin的start_rpc_listeners方法中就完成了rpc的功能,主要是通过消费特定名称的mq队列消息来提供服务。

下面分析_get_plugins_workers函数:

def _get_plugins_workers():
    # NOTE(twilson) get_plugins also returns the core plugin
    plugins = directory.get_unique_plugins()

    # TODO(twilson) Instead of defaulting here, come up with a good way to
    # share a common get_workers default between NeutronPluginBaseV2 and
    # ServicePluginBase
    return [
        plugin_worker
        for plugin in plugins if hasattr(plugin, ‘get_workers‘)
        for plugin_worker in plugin.get_workers()
    ]

_get_plugins_workers函数检查了每种plugin(不是每一个)中是否实现了get_workers方法,并将该方法返回的所有workers收集后返回。这里的get_workesr方法返回plugin用于实现自己的特殊需求或提供个性化服务的workers。

两个收集workers的函数执行完后,下面就是启动各个workers的函数:

def start_all_workers():
    workers = _get_rpc_workers() + _get_plugins_workers()
    return _start_workers(workers)
def _start_workers(workers):
    process_workers = [
        plugin_worker for plugin_worker in workers
        if plugin_worker.worker_process_count > 0
    ]                                                                 # 筛选出worker_process_count大于0的workers

    try:
        if process_workers:                                           # 如果存在worker_process_count大于0的workers
            worker_launcher = common_service.ProcessLauncher(
                cfg.CONF, wait_interval=1.0
            )

            # add extra process worker and spawn there all workers with
            # worker_process_count == 0
            thread_workers = [
                plugin_worker for plugin_worker in workers
                if plugin_worker.worker_process_count < 1
            ]
            if thread_workers:
                process_workers.append(
                    AllServicesNeutronWorker(thread_workers)
                )

            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            session.context_manager.dispose_pool()

            for worker in process_workers:
                worker_launcher.launch_service(worker,
                                               worker.worker_process_count)
        else:                                                          # 如果workers中的所有worker的worker_process_count都为0
            worker_launcher = common_service.ServiceLauncher(cfg.CONF)
            for worker in workers:
                worker_launcher.launch_service(worker)
        return worker_launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE(‘Unrecoverable error: please check log for ‘
                              ‘details.‘))

_start_workers首先判断workers中是否存在worker_process_count大于0的workers。

  • 如果不存在(else分支):实例化ServiceLauncher,并调用其launch_service方法依次在当前的进程中启动各个worker;
  • 如果存在(if分支):
    • 首先实例化ProcessLauncher;
    • 接着对workers中可能存在的worker_process_count为0的worker进行处理,将这样的worker形成thread_workers列表,并将这些worker作为参数实例化AllServicesNeutronWorker(AllServicesNeutronWorker也是NeutronWorker的子类并重写了start方法,直接调用Launcher.launch_service启动服务)。最后将AllServicesNeutronWorker实例append到process_workers列表中;
    • 最后,调用ProcessLauncher.launch_service方法依次启动各个worker(与Neutron API的启动一样)。

ServiceLauncher和ProcessLauncher均实现了launch_service方法。但ServiceLauncher是Launcher的子类,而ProcessLauncher的父类是Object。其launch_service存在的区别是:

  • ServiceLauncher是将任务放到greenthread中运行;
  • ProcessLauncher是将任务放到os fork出来的子进程中运行。
时间: 2024-11-07 18:38:44

Ocata Neutron代码分析(二)——Neutron RPC启动过程分析的相关文章

Android4.0图库Gallery2代码分析(二) 数据管理和数据加载

Android4.0图库Gallery2代码分析(二) 数据管理和数据加载 2012-09-07 11:19 8152人阅读 评论(12) 收藏 举报 代码分析android相册优化工作 Android4.0图库Gallery2代码分析(二) 数据管理和数据加载 一 图库数据管理 Gallery2的数据管理 DataManager(职责:管理数据源)- MediaSource(职责:管理数据集) - MediaSet(职责:管理数据项).DataManager中初始化所有的数据源(LocalSo

Ocata Neutron代码分析(一)——Neutron API启动过程分析

首先,Neutron Server作为一种服务(neutron-server.service),可以到Neutron项目目录中的setup.cfg配置文件中找到对应的代码入口. [entry_points] console_scripts = neutron-db-manage = neutron.db.migration.cli:main neutron-debug = neutron.debug.shell:main neutron-dhcp-agent = neutron.cmd.even

Ocata Neutron代码分析(三)——oslo_service中的ServiceLauncher和ProcessLauncher(转载)

1.概述 Openstack中有一个叫Launcher的概念,即专门用来启动服务的,这个类被放在了oslo_service这个包里面.Launcher分为两种,一种是ServiceLauncher,另一种为ProcessLauncher.ServiceLauncher用来启动单进程的服务,而ProcessLauncher用来启动有多个worker子进程的服务. 2.ServiceLauncher ServiceLauncher继承自Launcher,启动服务的一个重要成员就是launcher_s

guava eventbus代码分析(二)

---恢复内容开始--- 我们分析下EventBus的核心方法 post方法,直接贴代码 1 public void post(Object event) { 2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 3 if (eventSubscribers.hasNext()) { 4 dispatcher.dispatch(event, eventSubscribers); 5 } else

Openvswitch原理与代码分析(2): ovs-vswitchd的启动

ovs-vswitchd.c的main函数最终会进入一个while循环,在这个无限循环中,里面最重要的两个函数是bridge_run()和netdev_run(). ? ? Openvswitch主要管理两种类型的设备,一个是创建的虚拟网桥,一个是连接到虚拟网桥上的设备. ? 其中bridge_run就是初始化数据库中已经创建的虚拟网桥. ? 一.虚拟网桥的初始化bridge_run ? bridge_run会调用bridge_run__,bridge_run__中最重要的是对于所有的网桥,都调

android4.0 的图库Gallery2代码分析(二)

最近迫于生存压力,不得不给人兼职打工.故在博文中加了个求点击的链接.麻烦有时间的博友们帮我点击一下.没时间的不用勉强啊.不过请放心,我是做技术的,肯定链接没病毒,就是我打工的淘宝店铺.嘻嘻.http://shop108130013.taobao.com.谢谢捧场.干脆第一个回报大家的就是这个星期开始继续这篇博文吧,实在是迫于生计,无所不用其极.请谅解.今天是2013-12-31. 接下来开始摸索相册显示的流程吧. 一边摸索一边写,我想其间不乏错误的理解吧.摸索先. 相册显示相关有两个大的类别:

信息管理代码分析&lt;二&gt;读取二进制文件数据

first和end做为全局变量,分别指向链表的头和尾.建立链表的方式也比较简易,从二进制文件数据块中,依次从头到尾读取,每读取一个就建立一个结点. /*基本模型*/ EMP *emp1; while(!feof(fp))/*读取二进制文件到尾*/ { emp1=(EMP *)malloc(sizeof(EMP)); fread(emp1,sizeof(EMP),1,fp);/*读取数据*/ if(emp_first==NULL)/*链表串接*/ { emp_first=emp1; emp_end

C语言 03-第一个C程序代码分析

本文目录 一.代码分析 二.开发和运行C程序的步骤 三.总结 说明:这个C语言专题,是学习iOS开发的前奏.也为了让有面向对象语言开发经验的程序员,能够快速上手C语言.如果你还没有编程经验,或者对C语言.iOS开发不感兴趣,请忽略. 在上一篇中我们已经创建了一个C程序,接下来分析一下里面的代码. 项目结构如下: 一.代码分析 打开项目中的main.c文件(C程序的源文件拓展名为.c),可以发现它是第一个C程序中的唯一一个源文件,代码如下: 1 #include <stdio.h> 2 3 in

Neutron分析(2)——neutron-server启动过程分析

neutron-server启动过程分析 1. /etc/init.d/neutron-server DAEMON=/usr/bin/neutron-server DAEMON_ARGS="--log-file=$LOGFILE" DAEMON_DIR=/var/run ... case $1 in start) test "$ENABLED" = "true" || exit 0 log_daemon_msg "Starting ne