@snakeshiy
2018-11-10T10:46:38.000000Z
字数 5237
阅读 2058
AMQP
消息队列
中间件
测试
在大型互联网架构中经常会用到消息队列(Message Queue)这种中间件,在服务端测试时,许多测试同学通过工具对API和数据库都能熟练地进行测试,一说到消息队列的测试就有点不知道怎么入手了。那么对于看不见摸不着的消息队列,如何进行有效的测试呢?在介绍测试方法之前,我们先来了解一下消息队列的原理与机制,这里以常见的AMQP协议的消息队列为例。
消息队列,简单来说,就是我们通过网络向对方发送了一封短消息,短消息通过运营商网络发送到接收者,被对方读取。消息队列则是由生产者(消息的发送者)通过消息队列服务器向消费者发送一个带有主题的消息,消息体可以为字符串或者更多的数据结构,由消费者在消费端读取消息。
一般的消息队列可以支持多种订阅模式:
点对点模式类似于我们发短消息,指定了由某一个人来接收。
对于消息队列来说,生产者发送了一条消息,通过路由投递到指定的queue,只有一个消费者能收到。
订阅模式类似于我们刷微博,对于关注的人进行订阅,当被关注着发布一条新微博时,所有关注他的人都能够收到。
对于消息队列来说,生产者发送了一条带有主题的消息,通过路由投递到了绑定有该主题的queue,同时消息可复制成多份,由多个消费者接收。
当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
常见的基于AMQP的消息队列有:RabbitMQ、MaxQ(饿了么自研)
routing key
,分发消息到queue中去。exchange主要有四种类型:direct (点对点)、 topic (主题订阅) 、 fanout (广播)和 header(头信息匹配)。routing key
。Binding信息被保存到exchange中的查询表中,用于message的分发依据。消息队列的应用场景主要是围绕着 生产者 和 消费者 展开的,所以测试思路其实非常简单,如果被测应用是消息的生产者,那我们就模拟消费者去接收消息,验证发出的消息内容的正确性。如果被测应用是消息的消费者,那我们就模拟消息的生产者去发送消息,然后验证被测应用收到消息后的处理逻辑。
这里我们以最常见的RabbitMQ为例,介绍一下几种主要的测试方式。
日志法
如果被测应用是生产者,可以让开发将发送消息的内容打印在日志中,通过查看日志的方式进行验证,这也是比较常用的方法。
但是这种方式如果发送消息频次高、数据量大或者日志级别设置的不合理的话可能会对应用的性能造成一定影响。
RabbitMQ管理面板
我们需要模拟一个消费者去接收消息,直接从已有的queue中去取消息会和其他的消费者产生冲突,所以我们要新建一个测试queue,通过绑定相同的exchange
和routing key
,也拿到一份消息的copy。
具体步骤如下:
1. 使用和被测应用相同的vhost账号登陆RabbitMQ管理面板
2. 新建一个测试queue,命名保证唯一性
3. 在测试queue的bindings中,绑定相同的exchange
和routing key
4. 触发被测系统发送消息,在测试queue中Get Messages来获取消息
自动化测试的思路其实也是和手工测试一样,唯一的不同是手工测试时把消息取出来后是肉眼进行验证,而自动化测试则需要将消息落到一个可测的数据载体中,比如数据库。整体的思路如下图,是手工测试的一个延展。
这里我们使用到了python的pika库(官方文档:https://pypi.org/project/pika/)
首先,我们需要安装pika:
pip install pika
然后,我们模拟一个阻塞型的消费者来接收消息,代码如下:
# coding:utf-8
__author__ = "小肥羊"
import pika
import json
import sys
reload(sys)
sys.setdefaultencoding('utf8')
username = '用户名' # 连接RabbitMQ服务器的用户名
password = '密码' # 连接RabbitMQ服务器的密码
host = 'MQ服务器地址' # 连接RabbitMQ服务器的地址
port = '端口号' # 连接RabbitMQ服务器的端口号
vhost = 'vhost_name' # vhost名称
queue_name = 'test_queue' # 新建测试queue的名称
exchange_name = 'exchange_name' # exchange名称
routing_key = 'routing_key_name' # routing key名称
# 第一步,连接RabbitMQ服务器
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, vhost, credentials, socket_timeout=120))
# 在连接上创建一个频道
channel = connection.channel()
# 第二步,为确保队列存在,再次执行queue_declare创建一个队列,我们可以多次运行该命令,但是只有一个队列会创建
channel.queue_declare(queue=queue_name, durable=True)
# 第三步,为创建的队列绑定对应的exchange和routing key
channel.queue_bind(queue_name, exchange_name, routing_key)
print ' [*] Waiting for messages. To exit press CTRL+C'
# 第四步,定义一个回调函数,当获得消息时,Pika库调用这个回调函数来处理消息,该回调函数将消息内容打印到屏幕
def callback(ch, method, properties, body):
# 消息体body转成json格式
dumped = json.dumps(body, ensure_ascii=False)
pure_json = json.loads(body)
# 将接收到的消息打印到屏幕
print " [x] Received queue: %r" % (body,)
# 告诉服务器已经接收到消息
channel.basic_ack(delivery_tag=method.delivery_tag)
# 第五步,告诉RabbitMQ回调函数将从queue队列接收消息
channel.basic_consume(callback,
queue=queue_name,
no_ack=False)
# 第六步,输入一个无限循环来等待消息数据并运行回调函数
channel.start_consuming()
在callback回调函数中,只是将消息内容打印了出来,如果要运用在自动化测试中,我们还需要将消息内容写入数据库中,可以通过sqlalchemy等工具对DB进行写入操作,这里就不做详细介绍了。
另外,由于采用了阻塞型的连接,所以该脚本最好是部署在测试服务器上运行,以保证7*24小时的可用性。
如果数据来源依赖于消息的生产者,那么我们可以模拟生产者来发送消息。
在RabbitMQ的管理面板中,允许我们通过exchange
和绑定的routing key
来广播消息和推送订阅消息(fanout
、topic
以及header
模式),也可以直接往queue里面发送消息(direct
模式),在这里其实更推荐后者,因为通过前两者发出的消息可能有其他的应用系统在消费,可能会对其他应用造成影响,所以建议直接往被测应用监听的queue里发消息。
具体步骤如下:
1. 使用和被测应用相同的vhost账号登陆管理面板
2. 在queue面板中,找到被测应用监听的queue
3. 在publish message中的,填入消息内容并发送
4. 验证被测应用收到消息后的处理逻辑
自动化模拟生产者要比消费者简单得多,只需要将消息发送到指定的队列中去,也不需要阻塞式运行脚本。
实现代码如下:
# coding:utf-8
__author__ = '小肥羊'
import pika
import sys
reload(sys)
sys.setdefaultencoding('utf8')
username = '用户名' # 连接RabbitMQ服务器的用户名
password = '密码' # 连接RabbitMQ服务器的密码
host = 'MQ服务器地址' # 连接RabbitMQ服务器的地址
port = '端口号' # 连接RabbitMQ服务器的端口号
vhost = 'vhost_name' # vhost名称
queue_name = 'queue_name' # 被测系统监听的队列名称
# 第一步,连接RabbitMQ服务器
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, vhost, credentials, socket_timeout=120))
# 在连接上创建一个频道
channel = connection.channel()
# 第二步,声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue=queue_name, durable=True)
# 第三步,发送消息,routing_key填的是queue的名称,直接发消息给这个queue
channel.basic_publish(exchange='', routing_key=queue_name, body='要发送的消息')
# 第四步,关闭连接
connection.close()
本次分享中主要介绍了AMQP消息队列的简单运作机制和原理,以及针对生产者和消费者两种场景的测试方法,包含了手工和自动化的方式。
为什么要单独从消息中间件来进行测试呢?主要原因有:
1. 分层测试。在测试一个完整功能时有时需要采取分层测试策略,先进行服务端测试,再验证前端UI。
2. 测试解耦。消息队列的设计本身就是为了系统之间的解耦,如果每次测试时都要依赖上游或者下游一起验证,那么协同工作的成本将会很高。
作者 小肥羊
2018 年 11月 7日