@zwei
2016-04-21T02:07:12.000000Z
字数 9242
阅读 3692
rabbitmq
- 在openstack 的项目中需要 oslo.messaging 插件包
- oslo.messaging 的依赖包 kombu, pika.
在这里我们只分析 rabbitmq 相关的内容
OpenStack RPC 通信
Openstack 组件内部的 RPC(Remote Producer Call)机制的实现是基于 AMQP(Advanced Message Queuing Protocol)作为通讯模型,从而满足组件内部的松耦合性。AMQP 是用于异步消息通讯的消息中间件协议,AMQP 模型有四个重要的角色:
- Exchange:根据 Routing key 转发消息到对应的 Message Queue 中
- Routing key:用于 Exchange 判断哪些消息需要发送对应的 Message Queue
- Publisher:消息发送者,将消息发送的 Exchange 并指明 Routing Key,以便 Message Queue 可以正确的收到消息
- Consumer:消息接受者,从 Message Queue 获取消息
消息发布者 Publisher 将 Message 发送给 Exchange 并且说明 Routing Key。Exchange 负责根据 Message 的 Routing Key 进行路由,将 Message 正确地转发给相应的 Message Queue。监听在 Message Queue 上的 Consumer 将会从 Queue 中读取消息。
Routing Key 是 Exchange 转发信息的依据,因此每个消息都有一个 Routing Key 表明可以接受消息的目的地址,而每个 Message Queue 都可以通过将自己想要接收的 Routing Key 告诉 Exchange 进行 binding,这样 Exchange 就可以将消息正确地转发给相应的 Message Queue。图 2 就是 AMQP 消息模型。
图 2. AMQP 消息模型

AMQP 定义了三种类型的 Exchange,不同类型 Exchange 实现不同的 routing 算法:
- Direct Exchange:Point-to-Point 消息模式,消息点对点的通信模式,Direct Exchange 根据 Routing Key 进行精确匹配,只有对应的 Message Queue 会接受到消息
- Topic Exchange:Publish-Subscribe(Pub-sub)消息模式,Topic Exchange 根据 Routing Key 进行模式匹配,只要符合模式匹配的 Message Queue 都会收到消息 (模糊匹配)
- Fanout Exchange:广播消息模式,Fanout Exchange 将消息转发到所有绑定的 Message Queue
这个用dhcp-agent 服务为例子
# neutron/cmd/eventlet/agents/dhcp.pyfrom neutron.agent import dhcp_agentdef main():dhcp_agent.main()# neutron/agent/dhcp_agent.pyfrom neutron.common import config as common_configdef main():register_options(cfg.CONF)common_config.init(sys.argv[1:])config.setup_logging()server = neutron_service.Service.create(binary='neutron-dhcp-agent',topic=topics.DHCP_AGENT,report_interval=cfg.CONF.AGENT.report_interval,manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')service.launch(cfg.CONF, server).wait()# 这里分析 common_config.init(sys.argv[1:])def init(args, **kwargs):cfg.CONF(args=args, project='neutron',version='%%(prog)s %s' % version.version_info.release_string(),**kwargs)# FIXME(ihrachys): if import is put in global, circular import# failure occursfrom neutron.common import rpc as n_rpcn_rpc.init(cfg.CONF)# Validate that the base_mac is of the correct formatmsg = attributes._validate_regex(cfg.CONF.base_mac,attributes.MAC_PATTERN)if msg:msg = _("Base MAC: %s") % msgraise Exception(msg)# neutron/common/rpc.py# 这里分析n_rpc.initTRANSPORT_ALIASES = {'neutron.openstack.common.rpc.impl_fake': 'fake','neutron.openstack.common.rpc.impl_qpid': 'qpid','neutron.openstack.common.rpc.impl_kombu': 'rabbit','neutron.openstack.common.rpc.impl_zmq': 'zmq','neutron.rpc.impl_fake': 'fake','neutron.rpc.impl_qpid': 'qpid','neutron.rpc.impl_kombu': 'rabbit','neutron.rpc.impl_zmq': 'zmq',}def init(conf):global TRANSPORT, NOTIFIERexmods = get_allowed_exmods()TRANSPORT = oslo_messaging.get_transport(conf,allowed_remote_exmods=exmods,aliases=TRANSPORT_ALIASES)serializer = RequestContextSerializer()NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)# 这里重点分析 oslo_messaging.get_transport() 方法# oslo.messaging/transport.py 文件#def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):allowed_remote_exmods = allowed_remote_exmods or []# 这里就涉及到 3 个 配置文件的选项# transport_url = transport://user:pass@host1:port[,hostN:portN]/virtual_host# rpc_backend = rabbitmq# control_exchange = oepnstackconf.register_opts(_transport_opts)if not isinstance(url, TransportURL):url = url or conf.transport_urlparsed = TransportURL.parse(conf, url, aliases)if not parsed.transport:raise InvalidTransportURL(url, 'No scheme specified in "%s"' % url)url = parsedkwargs = dict(default_exchange=conf.control_exchange,allowed_remote_exmods=allowed_remote_exmods)# url.transport.split('+')[0] = 'rabbit'try:mgr = driver.DriverManager('oslo.messaging.drivers',url.transport.split('+')[0],invoke_on_load=True,invoke_args=[conf, url],invoke_kwds=kwargs)# 导入rabbitmq的namespace "oslo.messaging.drivers;" 的setup.cfg 相关的driver 类# invoke_on_load=True 并且初始化# 初始化的参数invoke_args=[conf, url], invoke_kwds=kwargs#oslo.messaging.drivers =# rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriverexcept RuntimeError as ex:raise DriverLoadFailure(url.transport, ex)#初始化 Transport 类return Transport(mgr.driver)# 代码中的driver.DriverManager 为初始化 driver类class RabbitDriver(amqpdriver.AMQPDriverBase):"""RabbitMQ DriverThe ``rabbit`` driver is the default driver used in OpenStack'sintegration tests.The driver is aliased as ``kombu`` to support upgrading existinginstallations with older settings."""def __init__(self, conf, url,default_exchange=None,allowed_remote_exmods=None):opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',title='RabbitMQ driver options')conf.register_group(opt_group)conf.register_opts(rabbit_opts, group=opt_group)conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)conf.register_opts(base.base_opts, group=opt_group)self.missing_destination_retry_timeout = (conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout)self.prefetch_size = (conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)connection_pool = pool.ConnectionPool(conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,url, Connection)super(RabbitDriver, self).__init__(conf, url,connection_pool,default_exchange,allowed_remote_exmods)#这里我们分析 Transport 类的初始化class Transport(object):"""A messaging transport.This is a mostly opaque handle for an underlying messaging transportdriver.It has a single 'conf' property which is the cfg.ConfigOpts instance usedto construct the transport object."""def __init__(self, driver):self.conf = driver.confself._driver = driver#到这里我们的rpc 相关的都初始化完成了
- rpc server 端就是一个 接受 消息端
- rpc server 端需要一个 回调方法 collback 方法
- rpc server 端是创建 consumer (消费者)
# 创建一个 direct 队列# 队列 routing_key = queue_name = topicdef declare_direct_consumer(self, topic, callback):"""Create a 'direct' queue.In nova's use, this is generally a msg_id queue used forresponses for call/multicall"""consumer = Consumer(exchange_name=topic,queue_name=topic,routing_key=topic,type='direct',durable=False,exchange_auto_delete=True,queue_auto_delete=False,callback=callback,rabbit_ha_queues=self.rabbit_ha_queues,rabbit_queue_ttl=self.rabbit_transient_queues_ttl)self.declare_consumer(consumer)def declare_topic_consumer(self, exchange_name, topic, callback=None,queue_name=None):"""Create a 'topic' consumer."""consumer = Consumer(exchange_name=exchange_name,queue_name=queue_name or topic,routing_key=topic,type='topic',durable=self.amqp_durable_queues,exchange_auto_delete=self.amqp_auto_delete,queue_auto_delete=self.amqp_auto_delete,callback=callback,rabbit_ha_queues=self.rabbit_ha_queues)self.declare_consumer(consumer)#创建一个 fanout 的 consumer#第一步 需要 验证 exchange 是否存在self.exchange = kombu.entity.Exchange(name=exchange_name,type=type,durable=self.durable,auto_delete=self.exchange_auto_delete)# 第二步 创建 queue 指定 exchange 并且给定 routing_keydef declare(self, conn):"""Re-declare the queue after a rabbit (re)connect."""self.queue = kombu.entity.Queue(name=self.queue_name,channel=conn.channel,exchange=self.exchange,durable=self.durable,auto_delete=self.queue_auto_delete,routing_key=self.routing_key,queue_arguments=self.queue_arguments)try:LOG.trace('ConsumerBase.declare: ''queue %s', self.queue_name)self.queue.declare()except conn.connection.channel_errors as exc:# NOTE(jrosenboom): This exception may be triggered by a race# condition. Simply retrying will solve the error most of the time# and should work well enough as a workaround until the race# condition itself can be fixed.# See https://bugs.launchpad.net/neutron/+bug/1318721 for details.if exc.code == 404:self.queue.declare()else:raise#在创建 fanout 的消费者时候 不需要指定 routing_keydef declare_fanout_consumer(self, topic, callback):"""Create a 'fanout' consumer."""unique = uuid.uuid4().hexexchange_name = '%s_fanout' % topicqueue_name = '%s_fanout_%s' % (topic, unique)consumer = Consumer(exchange_name=exchange_name,queue_name=queue_name,routing_key=topic,type='fanout',durable=False,exchange_auto_delete=True,queue_auto_delete=False,callback=callback,rabbit_ha_queues=self.rabbit_ha_queues,rabbit_queue_ttl=self.rabbit_transient_queues_ttl)self.declare_consumer(consumer)
Client 端发送 RPC 请求由 publisher 发送消息并声明消息地址,consumer 接收消息并进行消息处理,如果需要消息应答则返回处理请求的结果消息。OpenStack RPC 模块提供了
rpc.call,
rpc.cast,
rpc.fanout_cast
三种 RPC 调用方法,发送和接收 RPC 请求。
rpc.call 发送 RPC 请求并返回请求处理结果,
请求处理流程如图 5 所示,
由 Topic Publisher 发送消息,
Topic Exchange 根据消息地址进行消息转发至对应的 Message Queue 中,
Topic Consumer 监听 Message Queue,发现需要处理的消息则进行消息处理,
并由 Direct Publisher 将请求处理结果消息,请求发送方创建 Direct Consumer 监听消息的返回结果
rpc.cast 发送 RPC 请求无返回,请求处理流程如图 6 所示,与 rpc.call 不同之处在于,不需要请求处理结果的返回,因此没有 Direct Publisher 和 Direct Consumer 处理。

图 7. RPC.fanout 消息处理

# cast 没有返回结果def cast(self, ctxt, method, **kwargs):"""Invoke a method and return immediately. See RPCClient.cast()."""msg = self._make_message(ctxt, method, kwargs)ctxt = self.serializer.serialize_context(ctxt)if self.version_cap:self._check_version_cap(msg.get('version'))try:self.transport._send(self.target, ctxt, msg, retry=self.retry)except driver_base.TransportDriverError as ex:raise ClientSendError(self.target, ex)# call 有返回结果def call(self, ctxt, method, **kwargs):"""Invoke a method and wait for a reply. See RPCClient.call()."""if self.target.fanout:raise exceptions.InvalidTarget('A call cannot be used with fanout',self.target)msg = self._make_message(ctxt, method, kwargs)msg_ctxt = self.serializer.serialize_context(ctxt)timeout = self.timeoutif self.timeout is None:timeout = self.conf.rpc_response_timeoutif self.version_cap:self._check_version_cap(msg.get('version'))try:result = self.transport._send(self.target, msg_ctxt, msg,wait_for_reply=True, timeout=timeout,retry=self.retry)except driver_base.TransportDriverError as ex:raise ClientSendError(self.target, ex)return self.serializer.deserialize_entity(ctxt, result)