[关闭]
@snakeshiy 2018-11-10T10:46:38.000000Z 字数 5237 阅读 2058

AMQP消息队列的测试方法

AMQP 消息队列 中间件 测试


前言

在大型互联网架构中经常会用到消息队列(Message Queue)这种中间件,在服务端测试时,许多测试同学通过工具对API和数据库都能熟练地进行测试,一说到消息队列的测试就有点不知道怎么入手了。那么对于看不见摸不着的消息队列,如何进行有效的测试呢?在介绍测试方法之前,我们先来了解一下消息队列的原理与机制,这里以常见的AMQP协议的消息队列为例。

1. AMQP消息队列简介

1.1 什么是消息队列

消息队列,简单来说,就是我们通过网络向对方发送了一封短消息,短消息通过运营商网络发送到接收者,被对方读取。消息队列则是由生产者(消息的发送者)通过消息队列服务器向消费者发送一个带有主题的消息,消息体可以为字符串或者更多的数据结构,由消费者在消费端读取消息。

一般的消息队列可以支持多种订阅模式:

点对点模式类似于我们发短消息,指定了由某一个人来接收。

对于消息队列来说,生产者发送了一条消息,通过路由投递到指定的queue,只有一个消费者能收到。

image.png-374.7kB

订阅模式类似于我们刷微博,对于关注的人进行订阅,当被关注着发布一条新微博时,所有关注他的人都能够收到。

对于消息队列来说,生产者发送了一条带有主题的消息,通过路由投递到了绑定有该主题的queue,同时消息可复制成多份,由多个消费者接收。

image.png-485kB

1.2 什么是AMQP

当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

常见的基于AMQP的消息队列有:RabbitMQ、MaxQ(饿了么自研)

1.3 AMQP 0.9.1 工作模型

image.png-187.6kB

  1. Publisher将message发布到exchange(exchange可以看作是一个邮局或者邮件系统,也就是Broker)
  2. 然后exchange将message发送到queue(队列),这个过程称之为bindings(绑定)。
  3. AMQP Broker或者将message发送给订阅(subscribed)queue的consumer,或者consumer按需将message从queue中取出来。

名词解释

2. 消息队列的常用测试方法

消息队列的应用场景主要是围绕着 生产者消费者 展开的,所以测试思路其实非常简单,如果被测应用是消息的生产者,那我们就模拟消费者去接收消息,验证发出的消息内容的正确性。如果被测应用是消息的消费者,那我们就模拟消息的生产者去发送消息,然后验证被测应用收到消息后的处理逻辑。

这里我们以最常见的RabbitMQ为例,介绍一下几种主要的测试方式。

2.1 被测应用为生产者

2.1.1 手工测试

1541570472037.jpg-35.4kB

具体步骤如下:
1. 使用和被测应用相同的vhost账号登陆RabbitMQ管理面板
2. 新建一个测试queue,命名保证唯一性
image.png-44.5kB
3. 在测试queue的bindings中,绑定相同的exchangerouting key
1541572245434.jpg-40.8kB
4. 触发被测系统发送消息,在测试queue中Get Messages来获取消息
1541573185844.jpg-99kB

2.1.2 自动化测试

自动化测试的思路其实也是和手工测试一样,唯一的不同是手工测试时把消息取出来后是肉眼进行验证,而自动化测试则需要将消息落到一个可测的数据载体中,比如数据库。整体的思路如下图,是手工测试的一个延展。

1541576403366.jpg-58.5kB

这里我们使用到了python的pika库(官方文档:https://pypi.org/project/pika/
首先,我们需要安装pika:

  1. pip install pika

然后,我们模拟一个阻塞型的消费者来接收消息,代码如下:

  1. # coding:utf-8
  2. __author__ = "小肥羊"
  3. import pika
  4. import json
  5. import sys
  6. reload(sys)
  7. sys.setdefaultencoding('utf8')
  8. username = '用户名' # 连接RabbitMQ服务器的用户名
  9. password = '密码' # 连接RabbitMQ服务器的密码
  10. host = 'MQ服务器地址' # 连接RabbitMQ服务器的地址
  11. port = '端口号' # 连接RabbitMQ服务器的端口号
  12. vhost = 'vhost_name' # vhost名称
  13. queue_name = 'test_queue' # 新建测试queue的名称
  14. exchange_name = 'exchange_name' # exchange名称
  15. routing_key = 'routing_key_name' # routing key名称
  16. # 第一步,连接RabbitMQ服务器
  17. credentials = pika.PlainCredentials(username, password)
  18. connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, vhost, credentials, socket_timeout=120))
  19. # 在连接上创建一个频道
  20. channel = connection.channel()
  21. # 第二步,为确保队列存在,再次执行queue_declare创建一个队列,我们可以多次运行该命令,但是只有一个队列会创建
  22. channel.queue_declare(queue=queue_name, durable=True)
  23. # 第三步,为创建的队列绑定对应的exchange和routing key
  24. channel.queue_bind(queue_name, exchange_name, routing_key)
  25. print ' [*] Waiting for messages. To exit press CTRL+C'
  26. # 第四步,定义一个回调函数,当获得消息时,Pika库调用这个回调函数来处理消息,该回调函数将消息内容打印到屏幕
  27. def callback(ch, method, properties, body):
  28. # 消息体body转成json格式
  29. dumped = json.dumps(body, ensure_ascii=False)
  30. pure_json = json.loads(body)
  31. # 将接收到的消息打印到屏幕
  32. print " [x] Received queue: %r" % (body,)
  33. # 告诉服务器已经接收到消息
  34. channel.basic_ack(delivery_tag=method.delivery_tag)
  35. # 第五步,告诉RabbitMQ回调函数将从queue队列接收消息
  36. channel.basic_consume(callback,
  37. queue=queue_name,
  38. no_ack=False)
  39. # 第六步,输入一个无限循环来等待消息数据并运行回调函数
  40. channel.start_consuming()

在callback回调函数中,只是将消息内容打印了出来,如果要运用在自动化测试中,我们还需要将消息内容写入数据库中,可以通过sqlalchemy等工具对DB进行写入操作,这里就不做详细介绍了。
另外,由于采用了阻塞型的连接,所以该脚本最好是部署在测试服务器上运行,以保证7*24小时的可用性。

2.2 被测应用为消费者

2.2.1 手工测试

如果数据来源依赖于消息的生产者,那么我们可以模拟生产者来发送消息。

在RabbitMQ的管理面板中,允许我们通过exchange和绑定的routing key来广播消息和推送订阅消息(fanouttopic以及header模式),也可以直接往queue里面发送消息(direct模式),在这里其实更推荐后者,因为通过前两者发出的消息可能有其他的应用系统在消费,可能会对其他应用造成影响,所以建议直接往被测应用监听的queue里发消息。

具体步骤如下:
1. 使用和被测应用相同的vhost账号登陆管理面板
2. 在queue面板中,找到被测应用监听的queue
3. 在publish message中的,填入消息内容并发送
1541582668268.jpg-37.7kB
4. 验证被测应用收到消息后的处理逻辑

2.2.2 自动化测试

自动化模拟生产者要比消费者简单得多,只需要将消息发送到指定的队列中去,也不需要阻塞式运行脚本。

1541582588251.jpg-49.9kB

实现代码如下:

  1. # coding:utf-8
  2. __author__ = '小肥羊'
  3. import pika
  4. import sys
  5. reload(sys)
  6. sys.setdefaultencoding('utf8')
  7. username = '用户名' # 连接RabbitMQ服务器的用户名
  8. password = '密码' # 连接RabbitMQ服务器的密码
  9. host = 'MQ服务器地址' # 连接RabbitMQ服务器的地址
  10. port = '端口号' # 连接RabbitMQ服务器的端口号
  11. vhost = 'vhost_name' # vhost名称
  12. queue_name = 'queue_name' # 被测系统监听的队列名称
  13. # 第一步,连接RabbitMQ服务器
  14. credentials = pika.PlainCredentials(username, password)
  15. connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, vhost, credentials, socket_timeout=120))
  16. # 在连接上创建一个频道
  17. channel = connection.channel()
  18. # 第二步,声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
  19. channel.queue_declare(queue=queue_name, durable=True)
  20. # 第三步,发送消息,routing_key填的是queue的名称,直接发消息给这个queue
  21. channel.basic_publish(exchange='', routing_key=queue_name, body='要发送的消息')
  22. # 第四步,关闭连接
  23. connection.close()

3. 总结

本次分享中主要介绍了AMQP消息队列的简单运作机制和原理,以及针对生产者和消费者两种场景的测试方法,包含了手工和自动化的方式。

为什么要单独从消息中间件来进行测试呢?主要原因有:
1. 分层测试。在测试一个完整功能时有时需要采取分层测试策略,先进行服务端测试,再验证前端UI。
2. 测试解耦。消息队列的设计本身就是为了系统之间的解耦,如果每次测试时都要依赖上游或者下游一起验证,那么协同工作的成本将会很高。

4. 相关资料

  1. AMQP官方网站
  2. AMQP Wiki
  3. Python Pika 官方文档
  4. RabbitMQ Python Tutorial

作者 小肥羊
2018 年 11月 7日

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注