1.单个worker queue创建情况:
1 q-plugin: # neutron-sever topic of q-plugin 2 3 1.exchange => neutron 4 queue=>q-plugin 5 router_key=>q-plugin 6 2.exchange=>neutron 7 queue=>q-plugin.newton-controller 8 router_key=>q-plugin.newton-controller 9 3.exchange=>q-plugin_fanout 10 queue=>q-plugin_fanout_707fee67cf9746a2aa121593c73e84b1 11 router_key=>q-plugin
2. Consumer 创建监听(executor_thread_pool_size = 64):
1 start_rpc_listeners(plugin.py) -> create_consumer(neutron/common/rpc.py) -> get_rpc_server(oslo_messaging/rpc/server.py) -> MessageHandlingServer(oslo_messaging/server.py) 2 | # Create listener 3 consume_in_threads(neutron/common/rpc.py) -> start(oslo_messaging/server.py:MessageHandlingServer) 4 -> _runner(oslo_messaging/server.py:MessageHandlingServer) -> _listen(oslo_messaging/transport.py:Transport) 5 | # Dispatch message to Endpoint 6 __call__(oslo_messaging/rpc/dispatcher.py) -> __init__(oslo_messaging/rpc/dispatcher.py:DispatcherExecutorContext) 7 -> run(oslo_messaging/rpc/dispatcher.py:DispatcherExecutorContext)-> _dispatch_and_reply(oslo_messaging/rpc/dispatcher.py) 8 -> _dispatch(oslo_messaging/rpc/dispatcher.py:RPCDispatcher)[_is_compatible] -> _do_dispatch(oslo_messaging/rpc/dispatcher.py) 9 -> reply(oslo_messaging/_drivers/amqpdriver.py:AMQPIncomingMessage) -> direct_send(oslo_messaging/_drivers/impl_rabbit.py:Connection) 10 -> listen(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) -> poll(oslo_messaging/_drivers/amqpdriver.py:AMQPListener) 11 -> consume(oslo_messaging/_drivers/impl_rabbit.py:Connection)-> autoretry(kombu.connection.Connection) 12 -> _consume(oslo_messaging/_drivers/impl_rabbit.py:Connection) -> drain_events(kombu.connection.Connection) 13 -> consume(oslo_messaging/_drivers/impl_rabbit.py:Consumer) 14 | # Dispatch message to MessageHandlingServer-> _callback(oslo_messaging/_drivers/impl_rabbit.py:Consumer) 15 -> __call__(oslo_messaging/_drivers/amqpdriver.py:AMQPListener)
2. Publisher 发送情况(rpc_response_timeout = 60):
1 report_state(neutron/common/rpc.py:PluginReportStateAPI) -> get_client(neutron/common/rpc.py) -> __init__(neutron/common/rpc.py:BackingOffClient)-> __init__(oslo_messaging/rpc/dispatcher.py:RPCClient) 2 | 3 prepare(neutron/common/rpc.py:BackingOffClient) -> prepare(oslo_messaging/rpc/dispatcher.py:RPCClient) 4 ->prepare(oslo_messaging/rpc/dispatcher.py:_CallContext) 5 | 6 call(oslo_messaging/rpc/dispatcher.py:_CallContext) -> _send(oslo_messaging/transport.py:Transport) 7 ->send(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) -> _send 8 -> _get_reply_q(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) 9 -> wait(oslo_messaging/_drivers/amqpdriver.py:ReplyWaiter) -> _process_reply 10 ->topic_send(oslo_messaging/_drivers/impl_rabbit.py:Connection) -> _ensure_publishing -> _publish
时间: 2024-11-01 01:10:03