[关闭]
@zwei 2016-04-21T02:07:12.000000Z 字数 9242 阅读 3553

rabbitmq 的工作流程和详细分析

rabbitmq


1,rabbitmq 的 在openstack 项目中的调用过程分析

  • 在openstack 的项目中需要 oslo.messaging 插件包
  • oslo.messaging 的依赖包 kombu, pika.

在这里我们只分析 rabbitmq 相关的内容
OpenStack RPC 通信
Openstack 组件内部的 RPC(Remote Producer Call)机制的实现是基于 AMQP(Advanced Message Queuing Protocol)作为通讯模型,从而满足组件内部的松耦合性。AMQP 是用于异步消息通讯的消息中间件协议,AMQP 模型有四个重要的角色:

  • Exchange:根据 Routing key 转发消息到对应的 Message Queue 中
  • Routing key:用于 Exchange 判断哪些消息需要发送对应的 Message Queue
  • Publisher:消息发送者,将消息发送的 Exchange 并指明 Routing Key,以便 Message Queue 可以正确的收到消息
  • Consumer:消息接受者,从 Message Queue 获取消息

消息发布者 Publisher 将 Message 发送给 Exchange 并且说明 Routing Key。Exchange 负责根据 Message 的 Routing Key 进行路由,将 Message 正确地转发给相应的 Message Queue。监听在 Message Queue 上的 Consumer 将会从 Queue 中读取消息。
Routing Key 是 Exchange 转发信息的依据,因此每个消息都有一个 Routing Key 表明可以接受消息的目的地址,而每个 Message Queue 都可以通过将自己想要接收的 Routing Key 告诉 Exchange 进行 binding,这样 Exchange 就可以将消息正确地转发给相应的 Message Queue。图 2 就是 AMQP 消息模型。
图 2. AMQP 消息模型
AMQP

AMQP 定义了三种类型的 Exchange,不同类型 Exchange 实现不同的 routing 算法:

  • Direct Exchange:Point-to-Point 消息模式,消息点对点的通信模式,Direct Exchange 根据 Routing Key 进行精确匹配,只有对应的 Message Queue 会接受到消息
  • Topic Exchange:Publish-Subscribe(Pub-sub)消息模式,Topic Exchange 根据 Routing Key 进行模式匹配,只要符合模式匹配的 Message Queue 都会收到消息 (模糊匹配)
  • Fanout Exchange:广播消息模式,Fanout Exchange 将消息转发到所有绑定的 Message Queue

2, neutron 中所使用的 oslo.messaging 的服务分析

这个用dhcp-agent 服务为例子

  1. # neutron/cmd/eventlet/agents/dhcp.py
  2. from neutron.agent import dhcp_agent
  3. def main():
  4. dhcp_agent.main()
  5. # neutron/agent/dhcp_agent.py
  6. from neutron.common import config as common_config
  7. def main():
  8. register_options(cfg.CONF)
  9. common_config.init(sys.argv[1:])
  10. config.setup_logging()
  11. server = neutron_service.Service.create(
  12. binary='neutron-dhcp-agent',
  13. topic=topics.DHCP_AGENT,
  14. report_interval=cfg.CONF.AGENT.report_interval,
  15. manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
  16. service.launch(cfg.CONF, server).wait()
  17. # 这里分析 common_config.init(sys.argv[1:])
  18. def init(args, **kwargs):
  19. cfg.CONF(args=args, project='neutron',
  20. version='%%(prog)s %s' % version.version_info.release_string(),
  21. **kwargs)
  22. # FIXME(ihrachys): if import is put in global, circular import
  23. # failure occurs
  24. from neutron.common import rpc as n_rpc
  25. n_rpc.init(cfg.CONF)
  26. # Validate that the base_mac is of the correct format
  27. msg = attributes._validate_regex(cfg.CONF.base_mac,
  28. attributes.MAC_PATTERN)
  29. if msg:
  30. msg = _("Base MAC: %s") % msg
  31. raise Exception(msg)
  32. # neutron/common/rpc.py
  33. # 这里分析n_rpc.init
  34. TRANSPORT_ALIASES = {
  35. 'neutron.openstack.common.rpc.impl_fake': 'fake',
  36. 'neutron.openstack.common.rpc.impl_qpid': 'qpid',
  37. 'neutron.openstack.common.rpc.impl_kombu': 'rabbit',
  38. 'neutron.openstack.common.rpc.impl_zmq': 'zmq',
  39. 'neutron.rpc.impl_fake': 'fake',
  40. 'neutron.rpc.impl_qpid': 'qpid',
  41. 'neutron.rpc.impl_kombu': 'rabbit',
  42. 'neutron.rpc.impl_zmq': 'zmq',
  43. }
  44. def init(conf):
  45. global TRANSPORT, NOTIFIER
  46. exmods = get_allowed_exmods()
  47. TRANSPORT = oslo_messaging.get_transport(conf,
  48. allowed_remote_exmods=exmods,
  49. aliases=TRANSPORT_ALIASES)
  50. serializer = RequestContextSerializer()
  51. NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)
  52. # 这里重点分析 oslo_messaging.get_transport() 方法
  53. # oslo.messaging/transport.py 文件
  54. #
  55. def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
  56. allowed_remote_exmods = allowed_remote_exmods or []
  57. # 这里就涉及到 3 个 配置文件的选项
  58. # transport_url = transport://user:pass@host1:port[,hostN:portN]/virtual_host
  59. # rpc_backend = rabbitmq
  60. # control_exchange = oepnstack
  61. conf.register_opts(_transport_opts)
  62. if not isinstance(url, TransportURL):
  63. url = url or conf.transport_url
  64. parsed = TransportURL.parse(conf, url, aliases)
  65. if not parsed.transport:
  66. raise InvalidTransportURL(url, 'No scheme specified in "%s"' % url)
  67. url = parsed
  68. kwargs = dict(default_exchange=conf.control_exchange,
  69. allowed_remote_exmods=allowed_remote_exmods)
  70. # url.transport.split('+')[0] = 'rabbit'
  71. try:
  72. mgr = driver.DriverManager('oslo.messaging.drivers',
  73. url.transport.split('+')[0],
  74. invoke_on_load=True,
  75. invoke_args=[conf, url],
  76. invoke_kwds=kwargs)
  77. # 导入rabbitmq的namespace "oslo.messaging.drivers;" 的setup.cfg 相关的driver 类
  78. # invoke_on_load=True 并且初始化
  79. # 初始化的参数invoke_args=[conf, url], invoke_kwds=kwargs
  80. #oslo.messaging.drivers =
  81. # rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
  82. except RuntimeError as ex:
  83. raise DriverLoadFailure(url.transport, ex)
  84. #初始化 Transport 类
  85. return Transport(mgr.driver)
  86. # 代码中的driver.DriverManager 为初始化 driver类
  87. class RabbitDriver(amqpdriver.AMQPDriverBase):
  88. """RabbitMQ Driver
  89. The ``rabbit`` driver is the default driver used in OpenStack's
  90. integration tests.
  91. The driver is aliased as ``kombu`` to support upgrading existing
  92. installations with older settings.
  93. """
  94. def __init__(self, conf, url,
  95. default_exchange=None,
  96. allowed_remote_exmods=None):
  97. opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',
  98. title='RabbitMQ driver options')
  99. conf.register_group(opt_group)
  100. conf.register_opts(rabbit_opts, group=opt_group)
  101. conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
  102. conf.register_opts(base.base_opts, group=opt_group)
  103. self.missing_destination_retry_timeout = (
  104. conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout)
  105. self.prefetch_size = (
  106. conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
  107. connection_pool = pool.ConnectionPool(
  108. conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
  109. url, Connection)
  110. super(RabbitDriver, self).__init__(
  111. conf, url,
  112. connection_pool,
  113. default_exchange,
  114. allowed_remote_exmods
  115. )
  116. #这里我们分析 Transport 类的初始化
  117. class Transport(object):
  118. """A messaging transport.
  119. This is a mostly opaque handle for an underlying messaging transport
  120. driver.
  121. It has a single 'conf' property which is the cfg.ConfigOpts instance used
  122. to construct the transport object.
  123. """
  124. def __init__(self, driver):
  125. self.conf = driver.conf
  126. self._driver = driver
  127. #到这里我们的rpc 相关的都初始化完成了

分析rpc 服务的代码流程

1, rpc 服务的 server 端 也就是 receive 端接受消息

  • rpc server 端就是一个 接受 消息端
  • rpc server 端需要一个 回调方法 collback 方法
  • rpc server 端是创建 consumer (消费者)

分析 oslo.messaging 包的分析 oslo_messaging/_drivers/./impl_rabbit.py

  1. # 创建一个 direct 队列
  2. # 队列 routing_key = queue_name = topic
  3. def declare_direct_consumer(self, topic, callback):
  4. """Create a 'direct' queue.
  5. In nova's use, this is generally a msg_id queue used for
  6. responses for call/multicall
  7. """
  8. consumer = Consumer(exchange_name=topic,
  9. queue_name=topic,
  10. routing_key=topic,
  11. type='direct',
  12. durable=False,
  13. exchange_auto_delete=True,
  14. queue_auto_delete=False,
  15. callback=callback,
  16. rabbit_ha_queues=self.rabbit_ha_queues,
  17. rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
  18. self.declare_consumer(consumer)
  19. def declare_topic_consumer(self, exchange_name, topic, callback=None,
  20. queue_name=None):
  21. """Create a 'topic' consumer."""
  22. consumer = Consumer(exchange_name=exchange_name,
  23. queue_name=queue_name or topic,
  24. routing_key=topic,
  25. type='topic',
  26. durable=self.amqp_durable_queues,
  27. exchange_auto_delete=self.amqp_auto_delete,
  28. queue_auto_delete=self.amqp_auto_delete,
  29. callback=callback,
  30. rabbit_ha_queues=self.rabbit_ha_queues)
  31. self.declare_consumer(consumer)
  32. #创建一个 fanout 的 consumer
  33. #第一步 需要 验证 exchange 是否存在
  34. self.exchange = kombu.entity.Exchange(
  35. name=exchange_name,
  36. type=type,
  37. durable=self.durable,
  38. auto_delete=self.exchange_auto_delete)
  39. # 第二步 创建 queue 指定 exchange 并且给定 routing_key
  40. def declare(self, conn):
  41. """Re-declare the queue after a rabbit (re)connect."""
  42. self.queue = kombu.entity.Queue(
  43. name=self.queue_name,
  44. channel=conn.channel,
  45. exchange=self.exchange,
  46. durable=self.durable,
  47. auto_delete=self.queue_auto_delete,
  48. routing_key=self.routing_key,
  49. queue_arguments=self.queue_arguments)
  50. try:
  51. LOG.trace('ConsumerBase.declare: '
  52. 'queue %s', self.queue_name)
  53. self.queue.declare()
  54. except conn.connection.channel_errors as exc:
  55. # NOTE(jrosenboom): This exception may be triggered by a race
  56. # condition. Simply retrying will solve the error most of the time
  57. # and should work well enough as a workaround until the race
  58. # condition itself can be fixed.
  59. # See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
  60. if exc.code == 404:
  61. self.queue.declare()
  62. else:
  63. raise
  64. #在创建 fanout 的消费者时候 不需要指定 routing_key
  65. def declare_fanout_consumer(self, topic, callback):
  66. """Create a 'fanout' consumer."""
  67. unique = uuid.uuid4().hex
  68. exchange_name = '%s_fanout' % topic
  69. queue_name = '%s_fanout_%s' % (topic, unique)
  70. consumer = Consumer(exchange_name=exchange_name,
  71. queue_name=queue_name,
  72. routing_key=topic,
  73. type='fanout',
  74. durable=False,
  75. exchange_auto_delete=True,
  76. queue_auto_delete=False,
  77. callback=callback,
  78. rabbit_ha_queues=self.rabbit_ha_queues,
  79. rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
  80. self.declare_consumer(consumer)

RPC 发送请求

Client 端发送 RPC 请求由 publisher 发送消息并声明消息地址,consumer 接收消息并进行消息处理,如果需要消息应答则返回处理请求的结果消息。OpenStack RPC 模块提供了
rpc.call,
rpc.cast,
rpc.fanout_cast
三种 RPC 调用方法,发送和接收 RPC 请求。

rpc.call 发送 RPC 请求并返回请求处理结果,
请求处理流程如图 5 所示,
call
由 Topic Publisher 发送消息,
Topic Exchange 根据消息地址进行消息转发至对应的 Message Queue 中,
Topic Consumer 监听 Message Queue,发现需要处理的消息则进行消息处理,
并由 Direct Publisher 将请求处理结果消息,请求发送方创建 Direct Consumer 监听消息的返回结果

rpc.cast 发送 RPC 请求无返回,请求处理流程如图 6 所示,与 rpc.call 不同之处在于,不需要请求处理结果的返回,因此没有 Direct Publisher 和 Direct Consumer 处理。
cast

图 7. RPC.fanout 消息处理
fanout_cast

cast, call 代码分析

  1. # cast 没有返回结果
  2. def cast(self, ctxt, method, **kwargs):
  3. """Invoke a method and return immediately. See RPCClient.cast()."""
  4. msg = self._make_message(ctxt, method, kwargs)
  5. ctxt = self.serializer.serialize_context(ctxt)
  6. if self.version_cap:
  7. self._check_version_cap(msg.get('version'))
  8. try:
  9. self.transport._send(self.target, ctxt, msg, retry=self.retry)
  10. except driver_base.TransportDriverError as ex:
  11. raise ClientSendError(self.target, ex)
  12. # call 有返回结果
  13. def call(self, ctxt, method, **kwargs):
  14. """Invoke a method and wait for a reply. See RPCClient.call()."""
  15. if self.target.fanout:
  16. raise exceptions.InvalidTarget('A call cannot be used with fanout',
  17. self.target)
  18. msg = self._make_message(ctxt, method, kwargs)
  19. msg_ctxt = self.serializer.serialize_context(ctxt)
  20. timeout = self.timeout
  21. if self.timeout is None:
  22. timeout = self.conf.rpc_response_timeout
  23. if self.version_cap:
  24. self._check_version_cap(msg.get('version'))
  25. try:
  26. result = self.transport._send(self.target, msg_ctxt, msg,
  27. wait_for_reply=True, timeout=timeout,
  28. retry=self.retry)
  29. except driver_base.TransportDriverError as ex:
  30. raise ClientSendError(self.target, ex)
  31. return self.serializer.deserialize_entity(ctxt, result)
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注