@HarrisonHDU
2016-05-16T03:29:51.000000Z
字数 3131
阅读 1203
AMQP
RabbitMQ
# 下载包
$ wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.4/rabbitmq-server-3.5.4-1.noarch.rpm
# 安装erlang
$ yum install erlang
# 安装rabbitmq
$ yum install rabbitmq-server-3.5.4-1.noarch.rpm
# 显示所有队列的待处理消息数量和待确认消息数量,默认显示待处理消息数量
$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 显示交换机(amq.*都是默认创建的交换机)
$ rabbitmqctl list_exchanges
# 显示绑定
$ rabbitmqctl list_bindings
# 查看基本信息
$ rabbitmqctl status
# 停止rabbitmq
$ rabbitmqctl stop
# 启动rabbitmq服务
$ rabbitmq-server
# 以守护进程方式启动rabbitmq服务
$ /sbin/service rabbitmq-server start/stop/restart/etc
# 开启监控插件(需要重启rabbitmq-server才能生效)
$ rabbitmq-plugins enable rabbitmq_management
# 队列持久化, 需要注意的是在rabbitmq中如果某个队列已经存在则即使调用不同的参数重新声明也是不会重新创建工作队列的。
channel.queue_declare(queue='task_queue1', durable=True)
# 消息持久化
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
)
)
消息属性
AMQP协议定义了14中消息属性,以下是几种常用的属性:
管理工具
简介
报警日志
# 提示磁盘空间不足
Disk free space insufficient. Free bytes:969465856 Limit:1000000000
# 该节点磁盘空间出现警报
disk resource limit alarm set on node rabbit@AY14072213291500867dZ
# 提示生产者将会被阻塞,除非警报解除
Publishers will be blocked until this alarm clears
# 提示空间恢复到阀值以上
Disk free space sufficient. Free bytes:1011625984 Limit:1000000000
# 提示警报解除
disk resource limit alarm cleared on node rabbit@AY14072213291500867dZ
# 创建
channel.exchange_declare(exchange='logs', type='fanout')
# 发送消息(发往一个叫logs的交换机, 路由键是一个空字符串, 有也会被忽略)
channel.basic_publish(exchange='logs', routing_key='', body=msg)
# 无名/默认交换机, 如果指定了routing_key, 则类似direct类型的路由规则
channel.basic_publish(exchange='',routing_key='xxx',body=msg)
# 创建
channel.exchange_declare(exchange='logs', type='direct')
# 发送信息
channel.basic_publish(exchange='logs', routing_key='xxx', body=msg)
有名队列
临时队列
# 创建一个临时队列, 只要不传queue参数即可。返回值, result.method.queue 表示队列名称
result = channel.queue_declare()
# 当消费者断开连接时,通过设置exclusive参数可以使队列自动删除
result = channel.queue_declare(exclusive=True)
那么谁应该使用创建队列?
生产者和消费者都可以创建队列,对于某个channel,消费者不能声明一个队列却订阅其他的队列。可以创建私有队列,私有队列只有创建者自己才可以使用。被标为auto-delete的队列在最后一个消费者取消订阅后会自动删除。如果创建一个已经存在的队列,是不会产生任何影响,也就是说即使参数不一样也不会生效。究竟由哪一方来创建队列?当队列不存在时,生产者的消息会被丢弃;而消费者不会受到消息。因此从健壮性考虑,生产者和消费者都需要尝试创建队列。
# 绑定交换机和队列
channel.queue_bind(exchange='logs', queue=result.method.queue)
# 创建带binding key的绑定关系
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='black')
虚拟连接。它是建立在下面的TCP连接中。数据流在channel中进行,一个TCP连接中可以建立多了虚拟连接。那么为啥要使用Channel而不是直接使用TCP连接,对操作系统来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。
一个TCP连接,消费者和生产者都是通过TCP连接到RabbitMQ Server