前段时间撰文分析了“云主机的启动过程”源码,读者应该注意到了nova-scheduler
,nova-compute
等组件是通过发起rpc.cast
, rpc.call
调用完成交互的。那今天我打算介绍下nova-compute
服务的启动过程,并重点分析下其与AMQP(rabbitmq)链接的建立过程。
在CentOS 7中启动nova-compute
服务,执行路径是这样的:
systemctl start openstack-nova-compute.service
-> /usr/bin/nova-compute
-> nova/cmd/compute.py/main
下面从入口main
开始分析,函数如下:
def main():
"""加载和设置配置参数,有两点需要注意:
1. 调用rpc.set_defaults设置默认的exchange为nova,如果不设置则为
openstack
2. 调用rpc.init设置Transport和Notifier,Transport是
oslo_messaging/transport.py/Transport实例,我采用的是默认的
rpc_backend=rabbit,所以Transport采用的driver=oslo_messaging/
_drivers/impl_rabbit.py/RabbitDriver;Notifier是一个通知消息发
送器,它借助Transport完成通知消息的发送
"""
config.parse_args(sys.argv)
#省略其他配置代码
.......
"""调用类方法nova/service.py/Service.create创建Service服务对象
输入参数topic = compute, db_allowd = false;`create`方法是一个
类方法(@classmethod),它首先基于输入参数和(/etc/nova.conf中的选
项)设置配置,然后创建一个Service对象并返回给调用者
"""
server = service.Service.create(binary=‘nova-compute‘,
topic=CONF.compute_topic,
db_allowed=CONF.conductor.use_local)
"""调用server方法启动服务并调用wait方法等待服务启动完成,serve方法创
建Launcher服务启动实例对象(这里是ServiceLauncher)来启动服务,
但最终会调用server.start方法启动服务。
"""
service.serve(server)
service.wait()
后文的分析将分两步进行:
Service
对象的初始化Service
的启动
Service
对象的初始化
上文说到create
方法会创建一个Service
对象,下面一起来看看其构造函数:
def __init__(self, host, binary, topic, manager,
report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, db_allowed=True,
*args, **kwargs):
"""nova/service.py/Service.__init__
输入参数如下:
host = ‘devstack‘
binary = ‘nova-compute‘
topic = ‘compute‘
manager = ‘nova.compute.manager.ComputeManager‘
report_interval = 10
periodic_enable = True
periodic_fuzzy_delay = 60
periodic_interval_max = None
db_allowed = False
args = ()
kwargs = {}
构造函数中主要实现了如下功能:
1. 成员变量赋值
2. 初始化ComputeManager对象
3. 初始化conductor API对象
后文重点分析2和3
"""
super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
#实例化servicegroup API(nova/servicegroup/api.py/API)
#CONF.servicegroup_driver指定所使用的存储驱动(可以是db,
#zookeeper,memcache,默认是db)
self.servicegroup_api = servicegroup.API()
#实例化ComputeManager(nova/compute/manager.py/ComputeManager)
manager_class =
importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.rpcserver = None
self.report_interval = report_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
self.backdoor_port = None
#实例化conductor API(nova/conductor/api.py/API)
self.conductor_api = conductor.API(use_local=db_allowed)
#发送ping消息,等待nova-conductor服务准备就绪
self.conductor_api.wait_until_ready(
context.get_admin_context())
实例化ComputeManager
ComputeManager实例化,就是创建ComputeManager对象,然后调用其__init__
方法,创建各种api接口及client rpc api,如:network、volume、image、conductor等,代码如下:
def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the
hypervisor.
"""
"""nova/compute/manager.py/ComputeManager.__init__
api的创建都是根据配置(/etc/nova.conf)中指定的类名称,然后创建
对应的API类实例,具体的类请看注释; 而client rpc api则是
#创建一个RPCClient实例并与特定的Target(指定了消息的发送目的
地)及Transport(消息传输层)关联,后文以
`nova/compute/rpcapi.py/ComputeAPI`为例分析具体的实现
"""
#nova/compute/manager.py/ComputeVirtAPI
self.virtapi = ComputeVirtAPI(self)
#nova/network/neutronv2/api.py/API
self.network_api = network.API()
#nova/volume/cinder.py/API
self.volume_api = volume.API()
#nova/image/api.py/API
self.image_api = image.API()
self._last_host_check = 0
self._last_bw_usage_poll = 0
self._bw_usage_supported = True
self._last_bw_usage_cell_update = 0
#nova/compute/api.py/API
self.compute_api = compute.API()
#nova/compute/rpcapi.py/ComputeAPI
#消息Target = {topic=‘compute‘, version=‘4.0‘}
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
"""nova/conductor/api.py/API,内部实现如下:
创建`nova/conductor/rpcapi.py/ConductorAPI`实例,
内部会创建一个rpc client,其Target如下:
Target = {topic = ‘conductor‘, version = ‘3.0‘}
创建`nova/baserpc.py/BaseAPI`实例,内部会创建一个rpc
client其Target如下:
Target = {topic = ‘conductor‘,
namespace = ‘baseapi‘, version = ‘1.0‘}
"""
self.conductor_api = conductor.API()
"""nova/conductor/rpc.py/ComputeTaskAPI
1. nova/conductro/rpcapi.py/ComputeTaskAPI
Target = {topic = ‘conductor‘,
namespace = ‘compute_task‘, version = ‘1.0‘}
"""
self.compute_task_api = conductor.ComputeTaskAPI()
#如果security_group_api配置为neutron或者quantum,则为True
self.is_neutron_security_groups = (
openstack_driver.is_neutron_security_groups())
#nova/consoleauth/rpcapi.py/ConsoleAuthAPI
# Target = {topic = ‘consoleauth‘, version = ‘2.1‘}
self.consoleauth_rpcapi = consoleauth.rpcapi.ConsoleAuthAPI()
#nova/cells/rpcapi.py/CellsAPI
# Target = {topic = ‘cells‘, version = ‘1.0‘}
self.cells_rpcapi = cells_rpcapi.CellsAPI()
"""nova/scheduler/client/__init__.py/SchedulerClient
内部延迟创建(在使用时再创建):
1. nova.scheduler.client.query.SchedulerQueryClient
初始化时创建nova/scheduler/rpcapi.py/SchedulerAPI,
Target = {topic = ‘scheduler‘, version = ‘4.0‘}
2. nova.scheduler.client.report.SchedulerReportClient
#两个客户端实例分别用来创建云主机时选主及更新主机信息
"""
self.scheduler_client = scheduler_client.SchedulerClient()
self._resource_tracker_dict = {}
self.instance_events = InstanceEvents()
self._sync_power_pool = eventlet.GreenPool()
self._syncs_in_progress = {}
self.send_instance_updates =
CONF.scheduler_tracks_instance_changes
if CONF.max_concurrent_builds != 0:
self._build_semaphore =
eventlet.semaphore.Semaphore(
CONF.max_concurrent_builds)
else:
self._build_semaphore =
compute_utils.UnlimitedSemaphore()
if max(CONF.max_concurrent_live_migrations, 0) != 0:
self._live_migration_semaphore =
eventlet.semaphore.Semaphore(
CONF.max_concurrent_live_migrations)
else:
self._live_migration_semaphore =
compute_utils.UnlimitedSemaphore()
super(ComputeManager,
self).__init__(service_name="compute",
*args, **kwargs)
# NOTE(russellb) Load the driver last. It may call
#back into the
# compute manager via the virtapi, so we want it to be
#fully
# initialized before that happens.
#使用的hypervisor是livirt,所以这里是LibvirtDriver
#(nova/virt/libvirt/driver/LibvirtDriver)
self.driver = driver.load_compute_driver(self.virtapi,
compute_driver)
#False
self.use_legacy_block_device_info = self.driver.need_legacy_block_device_info
下面以nova/compute/rpcapi.py/ComputeAPI
为例,分析下rpc 客户的的初始化:
def __init__(self):
"""nova/compute/rpcapi.py/ComputeAPI.__init__
"""
super(ComputeAPI, self).__init__()
#创建oslo_messaging/target.py/Target对象,该对象决定了
#消息的发送目的地;topic指定消息以topic模式发送,version
#指定了要求的消息版本,在这里CONF.compute_topic = ‘compute‘
target = messaging.Target(topic=CONF.compute_topic,
version=‘4.0‘)
#设置兼容的消息版本及创建序列化器
version_cap =
self.VERSION_ALIASES.get(CONF.upgrade_levels.compute,
CONF.upgrade_levels.compute)
serializer = objects_base.NovaObjectSerializer()
#创建rpc client api
#(oslo_messaging/rpc/client.py/RPCClient)
#正如之前分析rpc.init函数时所说,消息是按照Target指定的模式通过
#Transport传输到消息队列的
self.client = self.get_client(target, version_cap, serializer)
小结:ComputeManager实例化过程主要是创建与云主机操作相关的各种API及客户端RPC , 包括:network,volume,image,conductor,scheduler等,为服务启动后执行相关操作时提供服务
实例化conductor API并等待nova-conductor就绪
实例化conductor API
细心的读者,会发现ComputeManager实例化过程中也创建了一个
nova/conductor/api.py/API
实例,下面一起来看看代码:
def __init__(self):
"""1. 创建`nova/conductor/rpcapi.py/ConductorAPI`实例,内部会
创建一个rpc client,其Target如下:
Target = {topic = ‘conductor‘, version = ‘3.0‘}
2. 创建`nova/baserpc.py/BaseAPI`实例,内部会创建一个rpc
client其Target如下:
Target = {topic = ‘conductor‘,
namespace = ‘baseapi‘, version = ‘1.0‘}
"""
self._manager = rpcapi.ConductorAPI()
self.base_rpcapi = baserpc.BaseAPI(topic=CONF.conductor.topic)
等待nova-conductor就绪
nova-compute
服务是通过前述__init__
方法中创建的base_rpcapi
发送远程ping请求来确认nova-conductor
服务已经启动的,如下:
def wait_until_ready(self, context, early_timeout=10,
early_attempts=10):
"""nova/conductor/api.py/API.wait_until_ready
该方法通过调用`self.base_rpcapi`发送ping请求来确定
`nova-conductor`的状态,请求超时为early_timeout=10,重试次数为
early_attempts=10;下面的代码中省略了异常及重试部分
"""
"""内部首先调用rpc客户端api的prepare方法准备一个调用上下文
`_CallContext`,该上下文件与rpc api的Transport及Target关联,
接着调用_CallContext.call方法发送同步ping请求(消息内容为:‘1.21
GigaWatts‘)到消息队列,nova-conductor服务收到消息会给一个应答,否
则会报超时异常;具体的处理过程请看下面的分析
"""
self.base_rpcapi.ping(context, ‘1.21 GigaWatts‘,
timeout=timeout)
def ping(self, context, arg, timeout=None):
arg_p = jsonutils.to_primitive(arg)
#调用RPCClient.prepare方法准备一个请求上下文
#该方法实际上直接调用_CallContext.prepare类方法创建一个
#_CallContext对象,并绑定Target、Transport和serializer
cctxt = self.client.prepare(timeout=timeout)
#调用_CallContext.call发送同步ping请求,具体分析如下
return cctxt.call(context, ‘ping‘, arg=arg_p)
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See
RPCClient.call().
"""
if self.target.fanout:
raise exceptions.InvalidTarget(‘A call cannot be used‘
‘with fanout‘, self.target)
"""封装消息,结果如下:
{
‘args‘: {‘arg‘: ‘1.21 GigaWatts‘},
‘namespace‘: ‘baseapi‘,
‘method‘: ‘ping‘,
‘version‘: ‘1.0‘
}
arg 是消息内容,method是远程请求方法,namespace和version取自
Target,用来控制消息的接受
"""
msg = self._make_message(ctxt, method, kwargs)
#self.serializer在创建RPCClient时创建,这里是
#nova/rpc.py/RequestContextSerializer,用来序列化上下文
msg_ctxt = self.serializer.serialize_context(ctxt)
timeout = self.timeout
if self.timeout is None:
timeout = self.conf.rpc_response_timeout
if self.version_cap:
self._check_version_cap(msg.get(‘version‘))
try:
"""调用oslo_messaging/transport.py/Transport._send发送消
息,wait_for_reply=True表示需要应答,内部在发送消息前会先创建
一个consumer用来接受应答,Transport._send直接调用
AMQPDriverBase._send发送请求,下面具体分析
"""
result = self.transport._send(self.target, msg_ctxt,
msg,
wait_for_reply=True,
timeout=timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
#序列化应答消息,并返回给调用者
return self.serializer.deserialize_entity(ctxt, result)
#amqpdriver.py/AMQPDriverBase._send
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
envelope=True, notify=False, retry=None):
"""省略某些非核心代码"""
"""同步ping消息,需要等待应答,在这里设置消息id及应答队列,结果如下:
{
‘_msg_id‘: ‘221b9eafe51c475bb15fecafbd72ea17‘,
‘version‘: ‘1.0‘,
‘_reply_q‘: ‘reply_83fbf8446b564899b1e3c89753a8689a‘,
‘args‘: {‘arg‘: ‘1.21 GigaWatts‘}, ‘namespace‘: ‘baseapi‘,
‘method‘: ‘ping‘
}
_get_reply_q方法用来创建应答队列,内部会创建一个用于监听的rpc链接
(connection),链接通道(channel),消费者(direct模式),
Exchange,绑定队列与Exchange,启动监听线程(poll方法轮询消息)
创建rpc链接及通道的调用如下:
amqpdriver.py/AMQPDriverBase._get_connection ->
amqp.py/ConnectionContext -> amqp.py/ConnectionPool.create
-> impl_rabbit.py/Connection ->
komku/connection.py/Connection
创建消费者及Exchange的调用如下:
impl_rabbit.py/Connection.declare_direct_consumer ->
impl_rabbit.py/Consumer.declare_consumer
"""
if wait_for_reply:
msg_id = uuid.uuid4().hex
msg.update({‘_msg_id‘: msg_id})
LOG.debug(‘MSG_ID is %s‘, msg_id)
msg.update({‘_reply_q‘: self._get_reply_q()})
#添加uuid及打包上下文到消息体中
rpc_amqp._add_unique_id(msg)
rpc_amqp.pack_context(msg, context)
#序列化消息
if envelope:
msg = rpc_common.serialize_msg(msg)
#绑定应答消息队列
if wait_for_reply:
self._waiter.listen(msg_id)
#发送消息
try:
#创建一个用于发送消息的rpc链接,调用逻辑与上面创建监听rpc链接相
#似,不再赘述不同的是,发送链接是通过链接池创建的
with self._get_connection(rpc_amqp.PURPOSE_SEND) as
conn:
#发送通知消息
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg, retry=retry)
#fanout模式
elif target.fanout:
conn.fanout_send(target.topic, msg, retry=retry)
#ping消息采用的是topic模式(回过头看看创建RPCClient时创建
#的Target就会明白了),target内容如下:
#{topic=conductor, namespace=baseapi, version=1.0}
else:
topic = target.topic
if target.server:
topic = ‘%s.%s‘ % (target.topic, target.server)
#发送消息,根据target的内容,我们知道这里使用默认的
#exchange,也就是‘nova‘,具体内容见下面的分析
conn.topic_send(
exchange_name=self._get_exchange(target),
topic=topic, msg=msg,
timeout=timeout,
retry=retry)
#等待应答
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout)
if isinstance(result, Exception):
raise result
return result
finally:
#删除之前绑定的消息队列
if wait_for_reply:
self._waiter.unlisten(msg_id)
#impl_rabbit.py/Connection.topic_send
def topic_send(self, exchange_name, topic, msg, timeout=None,
retry=None):
"""Send a ‘topic‘ message."""
#创建一个类型为‘topic‘的Exchange,名字为‘nova‘
exchange = kombu.entity.Exchange(
name=exchange_name,
type=‘topic‘,
durable=self.amqp_durable_queues,
auto_delete=self.amqp_auto_delete)
#发送消息,routing_key = ‘conductor‘, 该函数的内部封装比较复杂
#(各种重试代码,异常代码,装饰器等用来尽力保证消息发送成功),
#简单来说:最后都会调用self._publish方法,内部会创建一个生产者并调用
#其publish方法,进而调用channel发布消息;
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, retry=retry)
小结:通过上面的分析我们知道nova-compute服务依赖于nova-conductor服务,在启动compute服务前需要保证conductor服务已经启动,否则会失败
总结下rabbitmq消息生成者及消费者的使用步骤:
生产者
- 创建链接
- 创建通道
- 创建Exchange
- 创建生产者
- 发送消息
消费者
- 创建链接
- 创建通道
- 创建Exchange
- 创建队列并与Exchange绑定
- 创建消费者
- 向服务器注册
- 等待接收消息
Service
的启动
create
方法返回就表明一切准备就绪了,下面来看看服务的启动过程,并重点分析与rabbitmq的链接过程:
def main():
.......
"""serve方法创建Launcher服务启动实例对象(这里是ServiceLauncher)
来启动服务,但最终会调用server.start方法启动服务。下面来看start方法
的实现
"""
service.serve(server)
.......
def start(self):
"""省略了某些非关键代码"""
"""manager是在初始化中创建的ComputeManager对象,init_host完成下
面的工作:
1. 完成LibvirtDriver相关的初始化
2. 更新云主机实例信息
3. 更新实例信息到scheduler
"""
self.manager.init_host()
#更新节点信息
self.manager.pre_start_hook()
#创建一个Target = {topic = ‘compute‘, server = ‘hostname‘}
target = messaging.Target(topic=self.topic, server=self.host)
"""创建endpoints, BaseRPCAPI内部会创建一个Target = {namespace
‘baseapi‘, version = ‘1.1‘}
manager = ComputeManager,如果您寻根究底的话,会发现nova-
compute服务从消息队列接受到消息后,通过dispatcher分发器分发后,最终
是会投递给ComputeManager的某个方法处理的--这其实是显然的,不是么!
"""
endpoints = [
self.manager,
baserpc.BaseRPCAPI(self.manager.service_name,
self.backdoor_port)
]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.NovaObjectSerializer()
#创建rpc 服务器,调用
#oslo_messaging/rpc/server.py/get_rpc_server创建rpc服务器,
#返回oslo_messaging/server.py/MessageHandlingServer对象
#下文具体分析
self.rpcserver = rpc.get_server(target, endpoints, serializer)
#启动rpc 服务器,下文重点分析
self.rpcserver.start()
.......
创建rpc server
#oslo_messaging/rpc/server.py/get_rpc_server
def get_rpc_server(transport, target, endpoints,
executor=‘blocking‘, serializer=None):
"""输入参数如下:
transport = oslo_messaging.transport.Transport
target = {topic=‘compute‘, server=‘devstack‘}
endpoints = [nova.compute.manager.ComputeManager,
nova.baserpc.BaseRPCAPI]
executor = ‘eventlet‘
serializer = nova.rpc.RequestContextSerializer
"""
#创建消息分发器
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
"""创建服务器对象,内部根据executor加载对应的执行器,这里是
oslo_messaging/_executors/impl_eventlet.py/EventletExecutor
"""
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
启动rpc server
rpc server创建好了,下面来看它的启动过程:
#oslo_messaging/server.py/MessageHandlingServer.start
def start(self):
"""Start handling incoming messages.
This method causes the server to begin polling the
transport for incoming messages and passing them to the
dispatcher. Message processing will continue until the
stop() method is called.
The executor controls how the server integrates with
the applications I/O handling strategy - it may choose
to poll for messages in a new process, thread or co-
operatively scheduled coroutine or simply by
registering a callback with an event loop. Similarly,
the executor may choose to dispatch messages in a new
thread, coroutine or simply the current thread.
"""
self._check_same_thread_id()
#只能启动一次
if self._executor is not None:
return
try:
"""在transport上启动消息监听(注册三个消费者),调用链如下:
dispatcher._listen -> transport._listen ->
driver._listen, 这里的driver = RabbitDriver,最后返回
AMQPListener对象,详细内容请看后文
"""
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self._running = True
#实例化执行器EventletExecutor,
self._executor = self._executor_cls(self.conf, listener,
self.dispatcher)
#启动EventletExecutor,调用listener.poll方法监听消息
#收到消息后通过dispatcher分发给ComputeManager的对应方法处理
self._executor.start()
下面具体来看看nova-compute中三个消费者的注册过程
注册消费者
承接上文,继续来看RabbitDriver._listen方法的实现:
#oslo_messaging/_drivers/amqpdriver.py/AMQPDriverBase._listen
def listen(self, target):
"""target是之前在start方法中创建的Target对象,内容如下:
{topic=‘compute‘, server=‘devstack‘}
"""
"""创建一个监听用途的rpc,具体的过程在上文分析过,函数调用链如下:
amqpdriver.py/AMQPDriverBase._get_connection ->
amqp.py/ConnectionContext -> amqp.py/ConnectionPool.create
-> impl_rabbit.py/Connection ->
komku/connection.py/Connection
"""
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
#创建一个监听器amqpdriver.py/AMQPListener,并与链接绑定
listener = AMQPListener(self, conn)
"""下面创建了三个消费者,用于接收其他组件/模块发送过来的消息
1. 第一个是topic类型的消费者,exchange_name = ‘nova‘,topic =
‘compute‘
2. 第二个也是topci类型的消费者,exchange_name = ‘nova‘,topic =
‘compute.devstack‘
3. 第三个是fanout类型的消费者,topic = ‘compute‘
读者可以通过rabbitmqctl工具查看相关的exchange,队列,生产者,消费
者信息
下文以第一个消费者的创建为例具体分析代码实现
"""
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),
topic=‘%s.%s‘ % (target.topic, target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
return listener
-------------------------------------------------------------
#impl_rabbit.py/Connection.declare_topic_consumer
def declare_topic_consumer(self, exchange_name, topic,
callback=None,
queue_name=None):
"""输入参数如下:
exchange_name = ‘nova‘
topic = ‘compute‘
callback = AMQPListener
queue_name = None
"""
#内部会创建一个名为exchange_name,类型为type的Exchange,
#amqp_durable_queues = False,默认值,重启后消息丢失
#amqp_auto_delete = False,默认值,不自动删除消息
#rabbit_ha_queues = False,默认值,没有HA
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type=‘topic‘,
durable=self.amqp_durable_queues,
auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
#为确保consumer向服务器注册成功,declare函数经过了层层的跳转,封
#装,装饰,重试等非常复杂的封装,下面的分析中,只给出正常的处理逻辑并忽
#略大部分的异常处理,具体看下文的分析
self.declare_consumer(consumer)
-------------------------------------------------------
接着来看上文的declare_consumer方法:
def declare_consumer(self, consumer):
"""Create a Consumer using the class that was passed in
and add it to our list of consumers
该函数中进行了第一次封装:定义了_declare_consumer, 封装具体
consumer的declare函数,目的是注册成功后将添加consumer到列表中
"""
#连接异常处理回调函数
def _connect_error(exc):
log_info = {‘topic‘: consumer.routing_key, ‘err_str‘: exc}
LOG.error(_LE("Failed to declare consumer for topic ‘%‘
‘(topic)s‘: %(err_str)s"), log_info)
#消费者的回调处理函数,执行真正的消费者注册,declare代码请看下面的分析
def _declare_consumer():
consumer.declare(self)
self._consumers.append(consumer)
self._new_consumers.append(consumer)
return consumer
"""ensure内部包含大量的异常重试代码定义对_declare_consumer对象装饰
封装,目的是确保消费者注册成功;具体的封装过程请看下文的分析
"""
with self._connection_lock:
return self.ensure(_declare_consumer,
error_callback=_connect_error)
---------------------------------------------------------------
继续第二层封装前,先来看看consumer的declare的代码:
#impl_rabbit.py/Consumer.declare
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect.
函数逻辑很简单:首先创建一个队列,然后向rabbitmq注册该队列及
exchange并将队列与exchange绑定
"""
#创建一个名为‘compute‘的队列,self.*参数在上文创建Consumer是指定
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
try:
LOG.trace(‘ConsumerBase.declare: ‘
‘queue %s‘, self.queue_name)
#向rabbitmq注册队列及exchange并绑定队列与exchange
self.queue.declare()
except conn.connection.channel_errors as exc:
"""NOTE(jrosenboom): This exception may be triggered
by a race condition. Simply retrying will solve the
error most of the time and should work well enough as
a workaround until the race condition itself can be
fixed.See
https://bugs.launchpad.net/neutron/+bug/1318721
for details.
"""
if exc.code == 404:
self.queue.declare()
else:
raise
--------------------------------------------------------------
继续来看看后续的装饰及封装过程,装饰及封装的目的只有一个:确保消费者能够向rabbitmq服务器注册成功:
#impl_rabbit.py/Connection.enusre
def ensure(self, method, retry=None,
recoverable_error_callback=None,
error_callback=None,
timeout_is_error=True):
"""这是第二层封装,定义execute_method方法封装第一层中输入的处理函数
_declare_consumer
该函数定义了多个回调处理函数,限于篇幅不给出具体代码,简单说明如下:
1. on_error - 处理连接异常的回调
2. on_recconnect - 处理重连成功后的回调
3. execute_method - 正常的回调,是_declare_consumer的封装
"""
def execute_method(channel):
self._set_current_channel(channel)
#method = _declare_consumer
method()
"""省略了try {} except 异常处理代码"""
#调用komku/connection.py/Connection.autoretry方法调用ensure方
#法再次封装函数调用,详情请继续看下面的分析
autoretry_method = self.connection.autoretry(
execute_method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
on_revive=on_reconnection,
)
"""根据对上述autoretry的代码分析,我们知道autoretry_method =
execute_method(ensured),实际的调用链如下:_ensured ->
revive.__call__ -> execute_method -> _declare_consumer
"""
ret, channel = autoretry_method()
self._set_current_channel(channel)
return ret
------------------------------------------------------------
下面的解读包含第三层及第四层封装,第三层中定义了一个callable对象Revival来封装上层输入的处理函数execute_method;而第四层中定义的_ensured方法包含‘死循环’重连来保证消费者注册成功
#komku/connection.py/Connection.autoretry
def autoretry(self, fun, channel=None, **ensure_options):
"""可以把该方法看成一个装饰器,创建Revival类封装输入方法fun =
execute_method,Revival类定义了__call__方法,说明它是一个
callable对象
"""
channels = [channel]
create_channel = self.channel
class Revival(object):
__name__ = getattr(fun, ‘__name__‘, None)
__module__ = getattr(fun, ‘__module__‘, None)
__doc__ = getattr(fun, ‘__doc__‘, None)
def revive(self, channel):
channels[0] = channel
def __call__(self, *args, **kwargs):
if channels[0] is None:
self.revive(create_channel())
#fun = execute_method
return fun(*args, channel=channels[0], **kwargs),
channels[0]
"""创建Revival对象,作为ensure的输入参数并继续封装以确保操作的成功
ensure方法内部定义_ensured方法,它是revive.__call__方法的封装,包
含异常及重试代码,_ensured方法的__name__,__module__,__doc__属性
都被按如下方式替换为Revival对象的__name__,__module__,__doc__:
_ensured.__name__ = "%s(ensured)" % fun.__name__
_ensured.__doc__ = fun.__doc__
_ensured.__module__ = fun.__module__
所以最后返回的是名为execute_method(ensured)的方法,时间的函数包装
是这样的:_ensured -> revive.__call__ -> execute_method ->
_declare_consumer
"""
revive = Revival()
return self.ensure(revive, revive, **ensure_options)
到这里nova-compute
服务内的rpc server就创建好了,通过上文的分析我们得到下面几个结论:
nova-compute
服务依赖于nova-conductor
服务nova-compute
服务依赖于消息队列(默认rabbitmq)nova-compute
服务启动过程中会创建三个消费者(两个topic类型,一个fanout类型)- 最后您会发现每个openstack服务进程(API除外)都创建了三个消费者,可以通过rabbitmqctl命令行工具查看相关信息!
希望对大家有帮助,本文完!