[关闭]
@dume2007 2017-02-24T05:43:33.000000Z 字数 8960 阅读 3096

RabbitMQ使用笔记

rabbitmq 消息队列 php


Rabbitmq特性

AMQP高级消息协议

AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,AMQP 客户端能够无视消息的来源任意发送和接受信息。

AMQP四个重要组成部分:

  1. virtual host,虚拟主机
  2. exchange,交换机
  3. queue,队列
  4. binding,绑定

一个虚拟主机持有一组交换机、队列和绑定。每台rabbitmq服务器可以有多个虚拟主机,默认为/。虚拟主机主要用于用户权限控制。因为RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。

队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。队列是由消费者(Consumer)通过程序建立的,如果一个消费者试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求。

交换机(Exchange)可以理解成具有路由表的路由程序。每个消息都有一个称为路由键(routingkey)的属性,就是一个简单的字符串。每个交换机都是一个独立的进程,合理利用服务器多核CPU使得rabbitmq性能得到最佳。交换机有多种类型,不同的交换机类型CPU开销是不一样的,一般来说CPU开销顺序是:

TOPIC > DIRECT > FANOUT > NAMELESS

交换机当中有一系列的绑定(binding),即路由规则(routes)。交换机如何判断要把消息送到哪个队列?你需要路由规则,即绑定(binding)。

一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“饿狼”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“log-route”的消息需要被送到两个队列,“log-file”和“log-output”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“log-route”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。

交换机类型

消息不直接发送到queue中,中间有一个exchange做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是推送到一定规则的queue,还是推送到多个queue中,还是被丢弃。这些规则都是通过exchange去定义的。

1、匿名交换机,工作队列模式
2、扇形交换机(fanout),发布订阅/广播模式
3、直连交换机(direct),路由绑定/广播精确匹配
4、主题交换机(topic),路由规则/广播模糊匹配
5、头交换机(header),定义AMQP头部属性

1、匿名交换机

应用场景:并行处理任务队列
此处输入图片的描述

工作队列(又称:任务队列—Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

代码示例

1)生产者

  1. //声明队列,$queue,$passive,$durable,$exclusive,$auto_delete
  2. $channel->queue_declare('task_queue', false, true, false, false);
  3. //发布消息到队列,$message,$exchange,$queue
  4. $channel->basic_publish($msg, '', 'task_queue');

2)消费者

  1. //声明队列
  2. $channel->queue_declare('task_queue', false, true, false, false);
  3. //消费数据异步回调,$queue,$consumer_tag,$no_local,$no_ack,$exclusive,$nowait,$callback
  4. $channel->basic_consume('task_queue', '', false, false, false, false, $callback);

相关特性

1)持久化。如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。已经定义过非持久化的队列不能再定义为持久化队列,我们得重新命名一个新的队列。必须把“队列”和“消息”都设为持久化。

  1. //交换机持久化
  2. $channel->exchange_declare('exchange', 'fanout', false, true, false);
  3. //对列持久化
  4. $channel->queue_declare('task_queue', false, true, false, false);
  5. //消息持久化
  6. $msg = new AMQPMessage($body, ['delivery_mode' => 2]);

2)Ack消息确认。当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

  1. $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
  2. //在回调函数中发送ack消息
  3. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

3)Qos公平调度。如果多个worker进程中,某个worker处理比较慢,另一个worker比较快,默认RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应,这样会使得比较慢的worker消息堆积过多,导致任务分配不均。Qos公平调度设置prefetch_count=1,即在同一时刻,不会发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

  1. //$prefetch_size,$prefetch_count,$global
  2. $channel->basic_qos(null, 1, null);

4)消息事务。将消息设为持久化并不能完全保证不会丢失。持久化只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用同步IO—它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,我们需要改写代码来支持事务(transaction)。

  1. $channel->tx_select();
  2. $channel->basic_publish($msg, '', 'task_queue');
  3. $channel->tx_commit();

5)confirm消息确认。 AMQP消息协议提供了事务支持,不过事务机制会导致性能急剧下降,所以rabbitmq特别引入了confirm机制。

Confirm有三种编程方式:

  1. 普通confirm模式。每发送一条消息后,调用wait_for_pending_acks()方法,等待服务器端confirm。实际上是一种串行confirm。

  2. 批量confirm模式。每次发送一批消息后,调用wait_for_pending_acks()方法,等待服务器端confirm。

  3. 异步confirm模式。提供一个回调方法,服务器端confirm了一条(或多条)消息后客户端会回调这个方法。

代码示例:

  1. //一旦消息被设为confirm模式,就不能设置事务模式,反之亦然
  2. $channel->confirm_select();
  3. //阻塞等待消息确认
  4. $channel->wait_for_pending_acks();
  5. //异步回调消息确认
  6. $channel->set_ack_handle();
  7. $channel->set_nack_handler();

2、扇形交换机

应用场景:简单的发布订阅模式,广播消息
此处输入图片的描述

这种广播式交换机会忽略路由关键字,不使用任何绑定参数将队列和交换机绑定在一起。
producer每向交换机发送一条消息,消息都会被无条件的传递到所有和这个交换机绑定的消息队列中。

代码示例

  1. //声明交换机类型,匿名交换机无需声明,$exchange,$type,$passive,$durable,$auto_delete
  2. $channel->exchange_declare('logs', 'fanout', false, false, false);
  3. //发布消息到fanout交换机,忽略routing_key
  4. $channel->basic_publish($msg, 'logs');
  5. //创建随机的临时队列
  6. list($queue_name, ,) = $channel->queue_declare("");
  7. //绑定队列到交换机
  8. $channel->queue_bind($queue_name, 'logs');

3、直连交换机

应用场景:每个消费者只订阅广播消息的子集,通过绑定键进行筛选。

此处输入图片的描述
此处输入图片的描述

代码示例

  1. //声明直连交换机类型
  2. $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  3. //发布的时候额外增加绑定键
  4. $channel->basic_publish($msg, 'direct_logs', $binding_key);
  5. //消费时声明一个临时随机队列
  6. list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  7. //绑定某个路由键
  8. $channel->queue_bind($queue_name, 'direct_logs', $binding_key);
  9. //绑定到多个路由键
  10. $channel->queue_bind($queue_name, 'direct_logs', $binding_other_key);
  11. //获取数据
  12. $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

4、主题交换机

应用场景:通过模糊匹配来基于多个标准执行路由操作。fanout和direct交换机是topic类型的一种特殊情况,前两种类型都能通过topic exchange实现。
此处输入图片的描述

代码示例

  1. //声明主题交换机类型
  2. $channel->exchange_declare(‘topic_logs', ‘topic', false, false, false);
  3. //通配符
  4. * (星号) 用来表示一个单词.
  5. # (井号) 用来表示任意数量(零个或多个)单词。
  6. //实现类似扇形交换机类型,通配符使用#
  7. $channel->queue_bind($queue_name, 'topic_logs', ‘#’);
  8. //实现类似直连交换机类型,不使用通配符*#,直接用字符串
  9. $channel->queue_bind($queue_name, topic_logs’, binding_key’);

5、header交换机

RabbitMQ使用的是AMQP协议,这种协议提供了header attribute参数,头交换机就是利用AMQP协议通过传送额外的路由参数来达到数据过滤的作用。

  1. //声明头交换机类型
  2. $channel->exchange_declare(‘header_logs', ‘header', false, false, false);
  3. //传递header消息属性
  4. $headers = new Wire\AMQPTable([‘colour’=>’ orange’]);
  5. $message = new AMQPMessage($body);
  6. $message->set('application_headers', $headers);
  7. $channel->basic_publish($message, 'topic_logs');
  8. //消费者,x-match对应all和any,all表示同时满足,any只要满足其中一个
  9. $arguments = [‘x-match’=>’any’, colour’=>’orange’, speed’=> quick’, species’=>’ rabbit’];
  10. $channel->queue_bind($queue_name, topic_logs’, ‘’, false, $arguments);
  11. $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

Header交换机是topic类型的有效补充,x-match=>any如果使用topic来做通配符匹配的话,需要实现三种worker才行。形如:<celerity>.<colour>.<species>

通配符实现:[quick.*.*], [*.orange.*], [*.*.rabbit]

RPC远程过程调用

RPC(Remote Procedure Call Protocol)——远程过程调用协议。即指两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

此处输入图片的描述

基于AMQP协议的提供的消息属性:

实现流程:

客户端调用示例:

  1. $fibonacci_rpc = new FibonacciRpcClient();
  2. $response = $fibonacci_rpc->call(30);
  3. echo " [.] Got ", $response, "\n";

FibonacciRpcClient类主要实现了client端从发送消息到rabbitmq以及等待回调函数取得结果的流程。

  1. //构造函数中定义临时队列,等待回调消息返回结果
  2. list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
  3. $this->channel->basic_consume($this->callback_queue, '', false, false, false, false, array($this, 'on_response'));
  4. //回调函数检查关联标志是否一致
  5. public function on_response($rep) {
  6. if($rep->get('correlation_id') == $this->corr_id) {
  7. $this->response = $rep->body;
  8. }
  9. }
  10. //客户端请求函数,关联标志correlation_id由uniqid()随机生成, 回调队列reply_to由构造函数中定义的随机队列名提供。$this->channel->wait()阻塞等待远程计算结果。
  11. public function call($n) {
  12. $this->response = null;
  13. $this->corr_id = uniqid();
  14. $msg = new AMQPMessage(
  15. (string) $n,
  16. array('correlation_id' => $this->corr_id,
  17. 'reply_to' => $this->callback_queue)
  18. );
  19. $this->channel->basic_publish($msg, '', 'rpc_queue');
  20. while(!$this->response) {
  21. $this->channel->wait();
  22. }
  23. return intval($this->response);
  24. }

服务端通过rabbmq得到请求数据在本地调用fib函数并发送结果到回调队列reply_to中

  1. $callback = function($req) {
  2. $n = intval($req->body);
  3. echo " [.] fib(", $n, ")\n";
  4. $msg = new AMQPMessage(
  5. (string) fib($n),
  6. array('correlation_id' => $req->get('correlation_id'))
  7. );
  8. $req->delivery_info['channel']->basic_publish(
  9. $msg, '', $req->get('reply_to'));
  10. $req->delivery_info['channel']->basic_ack(
  11. $req->delivery_info['delivery_tag']);
  12. };
  13. $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

rabbitmqctl 常用命令

  1. //交换机列表
  2. rabbitmqctl list_exchanges
  3. //绑定列表
  4. rabbitmqctl list_bindings
  5. //列出所有队列
  6. rabbitmqctl list_queues
  7. //列出未被ack确认的消息数
  8. rabbitmqctl list_queues name messages_ready messages_unacknowledged

参考资料

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注