cinder-volume服务上报自己的状态给cinder-scheduler的rpc通信代码分析

以juno版本为基础,主要从消息的生产者-消费者模型及rpc client/server模型来分析cinder-volume是如何跟cinder-scheduler服务进行rpc通信的

1、cinder-scheduler服务的启动入口

cat /usr/bin/cinder-scheduler
from cinder.common import config  # noqa
from cinder.openstack.common import log as logging
from cinder import service
from cinder import utils
from cinder import version

CONF = cfg.CONF
if __name__ == ‘__main__‘:
    CONF(sys.argv[1:], project=‘cinder‘,
         version=version.version_string())
    logging.setup("cinder")
    utils.monkey_patch()
    server = service.Service.create(binary=‘cinder-scheduler‘)
    service.serve(server)
    service.wait()

2、cinder-volume服务的启动入口 

cat /usr/bin/cinder-volume
from cinder.common import config  # noqa
from cinder.openstack.common import log as logging
from cinder import service
from cinder import utils
from cinder import version
host_opt = cfg.StrOpt(‘host‘,
                      help=‘Backend override of host value.‘)
CONF = cfg.CONF

if __name__ == ‘__main__‘:
    CONF(sys.argv[1:], project=‘cinder‘,
         version=version.version_string())
    logging.setup("cinder")
    utils.monkey_patch()
    launcher = service.get_launcher()
    if CONF.enabled_backends:
        for backend in CONF.enabled_backends:
            CONF.register_opts([host_opt], group=backend)
            backend_host = getattr(CONF, backend).host
            host = "%[email protected]%s" % (backend_host or CONF.host, backend)
            server = service.Service.create(host=host,
                                            service_name=backend)
            launcher.launch_service(server)
    else:
        server = service.Service.create(binary=‘cinder-volume‘)
        launcher.launch_service(server)
    launcher.wait()

 3、cinder-volume/cinder-scheduler两者服务的启动

/etc/cinder/cinder.conf文件配置的 scheduler_manager 管理类
# Full class name for the Manager for scheduler (string value)
scheduler_manager=cinder.scheduler.manager.SchedulerManager
# Full class name for the Manager for volume (string value)
volume_manager=cinder.volume.manager.VolumeManager

D:\code-program\cinder-codejuno\service.py
class Service(service.Service):
  def __init__(self, host, binary, topic, manager, report_interval=None,
                 periodic_interval=None, periodic_fuzzy_delay=None,
                 service_name=None, *args, **kwargs):
        super(Service, self).__init__()

        if not rpc.initialized():
            rpc.init(CONF)

        self.host = host
        self.binary = binary
        self.topic = topic
        self.manager_class_name = manager
        manager_class = importutils.import_class(self.manager_class_name)----加载各自的管理类
        manager_class = profiler.trace_cls("rpc")(manager_class)

        self.manager = manager_class(host=self.host,
                                     service_name=service_name,
                                     *args, **kwargs)
        self.report_interval = report_interval
        self.periodic_interval = periodic_interval
        self.periodic_fuzzy_delay = periodic_fuzzy_delay
        self.basic_config_check()
        self.saved_args, self.saved_kwargs = args, kwargs
        self.timers = []
        setup_profiler(binary, host)

    def start(self):-------启动服务
        version_string = version.version_string()
        LOG.info(_(‘Starting %(topic)s node (version %(version_string)s)‘),
                 {‘topic‘: self.topic, ‘version_string‘: version_string})
        self.model_disconnected = False
        self.manager.init_host()--------步骤一,调用cinder-scheduler/cinder-volume管理类的init_host方法
        ctxt = context.get_admin_context()
        try:
            service_ref = db.service_get_by_args(ctxt,
                                                 self.host,
                                                 self.binary)
            self.service_id = service_ref[‘id‘]
        except exception.NotFound:
            self._create_service_ref(ctxt)

        LOG.debug("Creating RPC server for service %s" % self.topic)

        target = messaging.Target(topic=self.topic, server=self.host)
        endpoints = [self.manager]
        endpoints.extend(self.manager.additional_endpoints)
        self.rpcserver = rpc.get_server(target, endpoints)--------创建rpc server对象,用于接受rpc client发送的请求
        self.rpcserver.start()-----启动rpc server对象,创建相应的rabbitmq队列,用于监听消息

        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(
                self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)

4、对于cinder-scheduler服务

步骤一,调用cinder-scheduler管理类的init_host方法,
该方法的一个重要角色是给cinder-volume服务发消息,告知对方需要上报自己的存储状态

D:\code-program\cinder-codejuno\scheduler\manager.py
from cinder.volume import rpcapi as volume_rpcapi
class SchedulerManager(manager.Manager):
    """Chooses a host to create volumes."""
    RPC_API_VERSION = ‘1.7‘
    target = messaging.Target(version=RPC_API_VERSION)		

    def init_host(self):
        ctxt = context.get_admin_context()
        self.request_service_capabilities(ctxt)

    def request_service_capabilities(self, context):
        volume_rpcapi.VolumeAPI().publish_service_capabilities(context)	

D:\code-program\cinder-codejuno\volume\rpcapi.py
class VolumeAPI(object):
    def publish_service_capabilities(self, ctxt):
        cctxt = self.client.prepare(fanout=True, version=‘1.2‘)
        cctxt.cast(ctxt, ‘publish_service_capabilities‘)

在cinder-scheduler服务启动的时候,
主动给cinder-volume服务发送 publish_service_capabilities 一次rpc 请求,来通知cinder-volume上报各自的存储特性
在这个过程中,cinder-scheduler充当的是消息的生产者,rpc client的角色,cinder-volume充当的是消息的消费者,rpc server的角色

5、对应cinder-volume服务

步骤一,调用cinder-volume管理类的init_host方法

from cinder import manager
D:\code-program\cinder-codejuno\volume\manager.py

class VolumeManager(manager.SchedulerDependentManager):
    """Manages attachable block storage devices."""
    RPC_API_VERSION = ‘1.19‘
    target = messaging.Target(version=RPC_API_VERSION)
    def init_host(self):
        """Do any initialization that needs to be run if this is a
           standalone service.
        """
		......
        self.publish_service_capabilities(ctxt)

    def publish_service_capabilities(self, context):
        """Collect driver status and then publish."""
        self._report_driver_status(context)------步骤一 上报驱动的状态
        self._publish_service_capabilities(context)-----步骤二 发布服务的特性
对5 步骤一详解
这是一个定位任务
    @periodic_task.periodic_task
    def _report_driver_status(self, context):
        LOG.info(_("Updating volume status"))
        if not self.driver.initialized:
            if self.driver.configuration.config_group is None:
                config_group = ‘‘
            else:
                config_group = (‘(config name %s)‘ %
                                self.driver.configuration.config_group)

            LOG.warning(_(‘Unable to update stats, %(driver_name)s ‘
                          ‘-%(driver_version)s ‘
                          ‘%(config_group)s driver is uninitialized.‘) %
                        {‘driver_name‘: self.driver.__class__.__name__,
                         ‘driver_version‘: self.driver.get_version(),
                         ‘config_group‘: config_group})
        else:
            volume_stats = self.driver.get_volume_stats(refresh=True)---调用相关的驱动,获取cinder-volume的状态
            if self.extra_capabilities:
                volume_stats.update(self.extra_capabilities)
            if volume_stats:
                # Append volume stats with ‘allocated_capacity_gb‘
                self._append_volume_stats(volume_stats)

                # queue it to be sent to the Schedulers.
                self.update_service_capabilities(volume_stats)---把cinder-volume的状态信息,赋值给cinder-scheduler的相关变量

D:\code-program\cinder-codejuno\manager.py

class SchedulerDependentManager(Manager):

    def __init__(self, host=None, db_driver=None, service_name=‘undefined‘):
        self.last_capabilities = None
        self.service_name = service_name
        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
        super(SchedulerDependentManager, self).__init__(host, db_driver)

    def update_service_capabilities(self, capabilities):
        """Remember these capabilities to send on next periodic update."""
        self.last_capabilities = capabilities

    @periodic_task.periodic_task
	一个定时任务,默认情况下,间隔一分钟的频率给nova-scheduler服务发送rpc 请求,把自己的存储资源信息更新给nova-scheduler服务
    def _publish_service_capabilities(self, context):
        """Pass data back to the scheduler at a periodic interval."""
        if self.last_capabilities:
            LOG.debug(‘Notifying Schedulers of capabilities ...‘)
            self.scheduler_rpcapi.update_service_capabilities(
                context,
                self.service_name,
                self.host,
                self.last_capabilities)

在这个过程中,cinder-volume服务,充当的是消息的生产者,rpc client的角色,cinder-scheduler服务充当的是消息的消费者,rpc server的角色 .除cinder-scheduler服务初始化或者重启的时候,会主动的给cinder-volume服务,发送一个上报自身存储状态的rpc请求,cinder-volume服务接受该请求,上报自己的存储状态外,剩下的都是cinder-volume服务,默认间隔60秒主动的上报自己的存储状态给cinder-scheduler服务,在状态上报时,rpc请求使用的是类型为fanout,名字为cinder-scheduler_fanout的exchange..

cinder-scheduler 的 HostManager 类维护一个数组service_states,每个cinder-volume 节点占该数组的一项,其值为该节点上 cinder-volume 服务的 capabilities,该值通过消息机制定期更新。在发生特定操作比如删除卷时,会进行立刻更新。

 

  

原文地址:https://www.cnblogs.com/potato-chip/p/12617016.html

时间: 2024-12-17 03:23:04

cinder-volume服务上报自己的状态给cinder-scheduler的rpc通信代码分析的相关文章

Cinder Volume 服务启动流程分析和周期性任务分析

1.cinder-volume服务的程序入口 #!/usr/bin/python2 # PBR Generated from u'console_scripts' import sys from cinder.cmd.volume import main if __name__ == "__main__": sys.exit(main()) 2.cinder/cmd/volume.py的main方法实现 def main(): objects.register_all() # impo

devstack环境中不能创建cinder volume

刚安装好的devstack环境中无法成功创建cinder volume,创建的volume的status为error:在cinder scheduler中看到失败log:2015-10-15 14:12:22.057 ERROR cinder.scheduler.flows.create_volume [req-14f77cef-6ee9-463c-b9ce-2ffd40b4076b ba152f9f637c4bc2810f35cf3d3696dd 0d777ab85d9949c6ab961c6

使用命令cinder-manage service remove删除不用的cinder service服务

使用cinder service-list可以查到cinder的服务状态和信息,有时会有一些不用的状态是down,以前是通过修改数据库删除这些down掉的服务. 社区中增加了命令:cinder-manage service remove <binary> <host>可以删除那些down掉的服务. 示例: # cinder service-list+------------------+-------------------+------+---------+-------+---

手动模拟attach cinder volume的过程

我们首先启动一台机器,启动的时候attach一个volume 创建一个空的cinder volume root:~# cinder create --display-name emptyvolume11g 11+---------------------+--------------------------------------+|       Property      |                Value                 |+--------------------

挂载了Cinder Volume的实例无法动态迁移排错

现象:挂载了Cinder Volume的实例无法动态迁移 [[email protected] nova]# tail -f compute.log 2016-01-13 16:36:12.870 18762 ERROR nova.virt.libvirt.driver [-] [instance: 9d3e4665-801e-44bd-b93a-82951102cc22] Live Migration failure: unable to resolve '/var/lib/nova/mnt/

Mark一下, dp状态转移方程写对,但是写代码都错,poj 1651 poj 1179

dp题: 1.写状态转移方程; 2.考虑初始化边界,有意义的赋定值,还没计算的赋边界值: 3.怎么写代码自底向上计算最优值 今天做了几个基础dp,全部是dp方程写对但是初始化以及计算写错 先是poj 1651 其实就是个赤裸裸的矩阵连乘,dp方程很容易写出 dp[i][j]=min(dp[i][k]+dp[k+1][j]+r[i]*c[k]*c[j],dp[i][j]); 先贴两个个二逼的代码,mark下自己多么的二逼: 二逼一:在计算的时候使用了还没有算出来的值,模拟下就知道第一重循环里算dp

@Java web程序员,在保留现场,服务不重启的情况下,执行我们的调试代码(JSP 方式)

一.前言 类加载器实战系列的第六篇(悄悄跟你说,这篇比较水),前面5篇在这里: 实战分析Tomcat的类加载器结构(使用Eclipse MAT验证) 还是Tomcat,关于类加载器的趣味实验 了不得,我可能发现了Jar 包冲突的秘密 重写类加载器,实现简单的热替换 @Java Web 程序员,我们一起给程序开个后门吧:让你在保留现场,服务不重启的情况下,执行我们的调试代码 最近事不算多,所以有点时间写博客,昨天写着写着,测试的同学反馈说有一个bug.我看了下服务端日志,空指针了: 下面会给出详细

[Icehouse][cinder] volume状态为 &quot;error_deleting&quot;无法删除 的解决方案

1.查看volume状态 [[email protected] ~]# cinder list +--------------------------------------+----------------+--------------+------+-------------+----------+-------------------------------------+ | ID | Status | Display Name | Size | Volume Type | Bootabl

通过服务监控手机呼叫状态并进行录音

两个服务互相守护 <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:tools="http://schemas.android.com/tools" android:layout_width="match_parent" android:layout_height="match_parent" android:or