[关闭]
@FunC 2018-01-02T16:31:25.000000Z 字数 16684 阅读 2158

Node.js Design Patterns | CH11

Node.js


通信☎️以及整合模式🍔(Messaging and Integration Patterns)

在上一章中,我们为了让系统变得可伸缩,将其进行了分割,并运行在多个机器上。为了让它们能够正确地工作,这些被分割的部分需要通过某种方式进行通信,因此,它们需要进行整合(intergrated)

主要有两种功能方式来进行整合:
1. 使用共享的储存空间,以此作为中介并保存全部的信息
2. 使用消息(message)来在系统的不同节点之间传播数据,时间以及命令。

消息(message):任何用于在组件和系统之间交换信息的,分离且有结构的数据。
消息传递系统(messaging system):用于促进网络中信息交换的一类解决方案、模式和架构

消息传递系统的基础(Fundamentals of a messaging system)

消息与消息传递系统的四要素:
* 通信方向:单向 or 双向
* 消息的用途
* 发信息的时机: 立即发送 or 异步发送
* 信息的发送者: 直接发送 or 通过代理

单向与请求/回复模式(One-way and request/reply patterns)

单向:

典型例子:
* 邮件
* web 服务器通过 WebSocket 向浏览器发送信息

请求/回复模式:

看起来很简单,但是当通讯是异步的,或者涉及多个节点时,事情就变复杂了:

任意取两个节点来看,它都是单向的。然而全局来看,发起者发送了请求之后,收到了一个相关联的响应(尽管来自不同的节点)。
综上,请求/回复模式与单向模式之间的真正差异,在于请求与回复的关系。在请求/回复模式中,它们都由发起者持有。

消息类型(Message types)

消息(message)本质上是不同软件组件之间的通信途径。使用消息的原因有很多:
* 想获取由其他系统/组件持有的信息
* 想远程执行某些操作
* 想通知其他端(peers)当前正在发生某些事情

根据其目的的不同,主要可以分成三类:命令消息、时间消息、文档消息。

命令消息

在先前设计模式一章中已有涉及,命令消息就是一个序列化的命令对象,目的是用于触发某些动作的执行(因此命令中需要包含执行该任务的全部信息)。
它能用于实现RPC系统,或者用于请求数据。RESTful HTTP 调用就是一个命令的简单例子。HTTP 动词代表着相应的操作:GET-获取资源;POST-创建资源;PUT-更新资源;DELETE-删除资源。

事件消息

事件消息用于通知其他组件有些事情发生了。通常包含事件类型,有时还包含细节(如上下文、作用对象以及参与者)
在 web 开发中,我们使用长轮询或者WebSockets从服务端获取通知时,使用了事件信息。
在分布式系统中,事件是一种重要的整合机制,它让我们保证系统的各个节点保持状态一致(keep on the same page)。

文档信息

文档信息主要用于组件/机器间传递数据。
文档与命令的不同之处在于,文档中的信息不涉及告诉接受者如何使用文档信息的内容。
而相比于事件信息,它不要求和某个事件相关。
通常命令消息的回复就是文档信息,例如返回请求的数据。

异步信息与队列

异步通信的与发短信类似:在发信息时不要求接受者在线;响应有可能马上返回,或者延迟返回,甚至没有返回;我们可以向多个接受者发送信息,然后以任意顺序收到响应。
简而言之——使用更少的资源获得更好的并行能力。
另一个重要优点是异步通讯的消息可以被储存起来,然后再分发(马上或者延后)。这一特性在接受者十分繁忙的时候很有用,我们可以使用消息队列(message queue)来实现:

如果接受者崩溃了或者掉线了,借助消息队列,消息能被累积起来,等到接受者可用时马上投递信息,增强了系统的健壮性。
这样一个消息队列可以在信息发送者里实现,也可以放在发送者与接受者之间,甚至可以用一个外部的系统作为中间件。

端对端 or 基于代理的消息传递

消息可以端对端直接发送,也可以通过一个叫信息代理(message broker)的中介系统发送。
代理的作用是将信息的接受者和发送者解耦

在端对端的架构中,每个节点都需要知道接受者的地址和端口,同时它们还需要协商好通信协议与消息格式。
而代理则消除了这些复杂度。每个节点都可以和任意数量的节点通信,同时不需要知道细节。代理还可以作为不同通讯协议的桥梁。

除了解耦和互操作性,代理还能提供一些进阶特性,例如持久化队列、路由、消息转换、监控以及多种传递信息模式的支持。

当然也有一些理由不使用代理:
* 移除一个单点故障(Removing a single point of failure)
* 代理也需要可伸缩,而端对端则只要求单个节点可伸缩
* 减少消息交换的延迟

如果实现一个端对端的消息系统,我们能拥有更高的灵活度和更强的功能,因为不比捆绑在任何特定的技术、协议和架构。

发布/订阅模式

发布订阅模式大概是最广为人知的单向消息传递模式。订阅者订阅一类信息,发布者产生信息并分发到所有相关的订阅者。下面是端对端和基于代理这两种模式下的发布/订阅模式:

发布/订阅模式特别之处,在于消息的发布者不需要事先知道任何接收者的信息,因此它可以和未知数量的订阅者协作。发布者和订阅着之间是松耦合的,因此被认为是整合分布式系统的理想模式。

而代理的加入进一步提高了解耦程度,并提高了可靠性(例如节点间的连接出了问题也不怕)

来写一个极简化的实时聊天应用

实现服务端

这里用到了 ws 库,它是 Node.js 端 WebSocket 的纯净实现。
app.js

  1. const WebSocketServer = require('ws').Server;
  2. // 静态文件服务器
  3. const server = require('http').createServer(
  4. require('ecstatic')({root:`${__dirname}/www`})
  5. );
  6. // 新建 WebSocket 服务器,挂载在现有的 HTTP 服务器上
  7. const wss = new WebSocketServer({server: server});
  8. wss.on('connection', ws => {
  9. console.log('Client connected');
  10. ws.on('message', msg => {
  11. console.log(`Message: ${msg}`);
  12. // 收到信息时广播给所有客户端
  13. broadcast(msg);
  14. })
  15. });
  16. function broadcast(msg) {
  17. wss.clients.forEach(client => {
  18. client.send(msg);
  19. });
  20. }
  21. server.listen(process.argv[2] || 8080);

实现客户端

www/index.html

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title></title>
  6. <script>
  7. // 浏览器新建 WebSocket 连接
  8. var ws = new WebSocket('ws://' + window.document.location.host);
  9. ws.onmessage = function(message) {
  10. var msgDiv = document.createElement('div');
  11. msgDiv.innerHTML = message.data;
  12. document.getElementById('messages').appendChild(msgDiv);
  13. };
  14. // 发送信息到服务器
  15. function sendMessage() {
  16. var message = document.getElementById('msgBox').value;
  17. ws.send(message);
  18. }
  19. </script>
  20. </head>
  21. <body>
  22. Message:
  23. <div id="messages"></div>
  24. <input id="msgBox" type="text" placeholder="Send a message">
  25. <input type="button" onclick="sendMessage()" value='Send'>
  26. </body>
  27. </html>

运行并扩容聊天应用

我们通过在不同的端口运行我们的聊天应用,来模拟扩容的情况:

  1. node app 8080
  2. node app 8081

这时在浏览器上分别访问localhost:8080localhost:8081,并发送信息。会发现两个页面并不能互相通信(因为这是还是两台独立的服务器)

使用Redis作为消息代理

Redis 是一个非常快且灵活的键值对存储容器,更多地被用作数据库,不过它也提供了一些特性,使其可以实现中心化了发布/订阅模式。
加入Reids后我们聊天应用的架构将变成这样:

可见,每个服务器实例既是发布者,也是订阅者,它们之间通过redis来相互连接。

Reids 允许对一个频道进行订阅以及发布信息,频道以字符串作为标识区分,同时允许匹配多个频道,如 chat.*

原本的服务端代码修改后如下:

  1. const WebSocketServer = require('ws').Server;
  2. const redis = require('redis');
  3. // 分别建立两个redis客户端
  4. const redisSub = redis.createClient();
  5. const redisPub = redis.createClient();
  6. const server = require('http').createServer(
  7. require('ecstatic')({root:`${__dirname}/www`})
  8. );
  9. const wss = new WebSocketServer({server: server});
  10. wss.on('connection', ws => {
  11. console.log('Client connected');
  12. ws.on('message', msg => {
  13. console.log(`Message: ${msg}`);
  14. // redisPub 在 chat_message 频道上发布消息
  15. redisPub.publish('chat_messages', msg);
  16. })
  17. });
  18. // redisSub 订阅 chat_message 频道
  19. redisSub.subscribe('chat_messages');
  20. redisSub.on('message', (channel, msg) => {
  21. wss.clients.forEach(client => {
  22. client.send(msg);
  23. });
  24. });
  25. server.listen(process.argv[2] || 8080);

因为此时只开了一个redis服务(默认端口6379),因此此处的redis是不同的服务器实例共享的。
另外之所以需要两个redis客户端来分别作发布者和订阅者,是因为在redis中,一旦实例订阅了一个频道,那么它就只能使用和订阅相关的命令了。

此时我们运行多个应用实例:

  1. node app 8080
  2. node app 8081
  3. node app 8082

它们之间的信息借助redis实现了共享

使用 ØMQ 端对端的发布/订阅模式

ØMQ 是一个 networking 库,它偏底层,速度快,有着极简的 API,同时也有一些搭建消息系统的基础组件,还支持各种类型的传输(如 ipc, inproc, pgm, tcp等)。

给聊天服务设计端对端的架构

我们把代理(broker)从架构中移除了,所以每个聊天应用都要和其他实例直接连接。在ØMQ中,有两种类型的套接字是为这种用途设计的:PUBSUB。典型的模式就是将 PUB 套接字绑定到一个端口上,监听来自其他 SUB 套接字的订阅。

从上图可以看出,这种架构中,每个节点都需要意识到其他节点的存在(与基于代理的模式不同)

使用ØMQ的 PUB/SUB 套接字

基本就和上图的架构一样,手动创建 PUB/SUB 套接字。
然后做一个接线员,手动把每个 PUB 套接字和其他的 SUB 套接字接上:
app.js

  1. const WebSocketServer = require('ws').Server;
  2. // minimist 用于读取命令行的选项参数,形如 --http 8080
  3. const args = require('minimist')(process.argv.slice(2));
  4. // zmq 用于操作 ØMQ
  5. const zmq = require('zmq');
  6. //static file server
  7. const server = require('http').createServer(
  8. require('ecstatic')({root: `${__dirname}/www`})
  9. );
  10. // 创建 PUB 套接字,端口从命令行参数选项中读取
  11. const pubSocket = zmq.socket('pub');
  12. pubSocket.bind(`tcp://127.0.0.1:${args['pub']}`);
  13. // 每个实例被多个实例订阅
  14. const subSocket = zmq.socket('sub');
  15. const subPorts = [].concat(args['sub']);
  16. subPorts.forEach(p => {
  17. console.log(`Subscribing to ${p}`);
  18. subSocket.connect(`tcp://127.0.0.1:${p}`);
  19. });
  20. // 只订阅 chat 开头的信息
  21. subSocket.subscribe('chat');
  22. subSocket.on('message', msg => {
  23. console.log(`From other server: ${msg}`);
  24. // 将从订阅处收到的信息广播到自己实例下的其他客户
  25. broadcast(msg.toString().split(' ')[1]);
  26. });
  27. const wss = new WebSocketServer({server: server});
  28. wss.on('connection', ws => {
  29. console.log('Client connected');
  30. ws.on('message', msg => {
  31. console.log(`Message: ${msg}`);
  32. // 广播给自己实例下的其他客户
  33. broadcast(msg);
  34. // 发布给其他订阅的聊天服务实例
  35. pubSocket.send(`chat ${msg}`);
  36. });
  37. });
  38. function broadcast(msg) {
  39. wss.clients.forEach(client => {
  40. client.send(msg);
  41. });
  42. }
  43. server.listen(args['http'] || 8080);

值得注意的是,最开始订阅其他的 PUB 套接字时,它们尚未绑定到对应端口,但ØMQ却不会报错,这是因为ØMQ有着自动重连机制,每隔一段时间就会尝试重新连接。同时如果 PUB 套接字没有被订阅的话,信息也会直接丢掉。

上面的例子中我们假定架构是静态不变的,实例的数量和地址也是已知的。对于动态的,复杂的架构,我们可以引入 service registry 。

持久化的订阅者(Durable subscriber )

消息系统中的一个重要概念,就是消息队列(MQ, message queue)。有了消息队列,发送者和接收者在通信时不再需要同时建立起联系,在接收者不在线时,队列会帮忙保存信息,直到接受者上线。

持久化订阅者(durable subscriber):一个总是能可靠地接收到所有信息的订阅者,即使在订阅前就已经发送出去的信息也一样能接受到。

MQTT 定义了服务质量( Quality of Service)的不同等级:
* QoS0,最多一次:就是传统的 set and forget,信息是不持久的,其发送成功与否是不知道的。意味着如果断开连接或者崩溃的话,信息将会丢失。
* QoS1,至少一次:信息保证至少被接受到一次,也可能被重复接受,因为接受者可能在还没来得及通知发送者就崩溃了,为了确保送达,信息将被重发一次。
* QoS2,准确的一次:这是最可靠的QoS,它确保信息接受且仅接受一次。但同时带来了一些代价,例如速度更慢,更多的数据密集机制用于确定信息的投递状态。

正如之前提到的,一个消息队列可以在订阅者离线时把消息积攒起来。这个队列可以储存在内存中,也可以持久化到磁盘中,这样即使代理重启或崩溃都能将信息恢复:

Redis 的发布/订阅命令实现了QoS0。我们还可以结合使用其他的命令,来使 Redis 实现持久化订阅者。

介绍 AMQP

长久以来,可靠的信息持久化技术被大公司所垄断。幸运的是,多亏了一些开放协议的发展(如AMQP,STOMP和MQTT),消息系统进入主流。

AMQP是一个开放的标准协议,支持许多消息队列系统。它除了定义了一个平常的通信协议以外,还提供了用于描述路由、过滤器、队列、可靠性和安全性的模型。
在AMQP中,有三个基础组件:
* Queue:将提供给客户端消费的消息存储起来的数据结构。支持一个或多个队列,当多个消费者挂载到同一个队列时,信息将通过负载均衡的方式分发。队列有下面三种类型:
* Durable:队列在代理重启时将自动重新创建。一个持久化(durable)的队列并不确保其内容也是持久化的。
* Exclusive:队列仅和一个特定的订阅者连接,当连接关闭时,队列也被销毁。
* Auto-delete:队列在最后一个订阅者也断开连接的时候删除。
* Exchange:这里是生产消息的地方。它将消息导向一个或多个队列中,根据算法不同而不同:
* Direct exchange:完全匹配路由关键字时才将消息导向队列(如chat.msg
* Topic exchange:匹配符合某个关键字模式的消息(如 chat.# 匹配所有以chat 开头的关键字)
* Fanout exchange:忽略路由关键字,将消息广播到所有连接的队列上
* Binding:用于连接Exchange和queue,同时定义了路由关键字,或者其他过滤来自exchange的消息的规则

这些组件由一个代理(broker)来管理,代理提供了一系列相关的API。当客户端连接到代理时,它创建了一个频道(一个连接的抽象),这个频道负责维持于代理通信的状态。
下图展示了将这些组件组合起来的样子:

使用AMQP和RabbitMQ创建持久化订阅者(Durable subscribers with AMQP and RabbitMQ)

一个典型的场景,就是我们为了确保一个微服务架构中的状态保持一致,不能丢失任何信息。

给我们的聊天应用设计一个历史记录服务

我们将添加一个历史记录服务,通过数据库将聊天记录持久化,这样当客户端连接的时候,我们查询并取回相应的聊天记录。如下图所示:

这里我们只用到一个fanout exchange,不需要路由,同时为每个聊天服务实例提供一个队列。这些队列是 exclusive 的,一旦连接关闭队列就会销毁,信息的可靠性通过历史记录服务来提供。因此历史记录的 queue 要求是持久化🔒的。

使用 AMQP 实现一个可靠的历史记录服务

这个模块包括两部分:
1. 一个 HTTP 服务器,用于将聊天记录暴露给客户端
2. 一个 AMQP 消费者(AMQP consumer),负责将聊天信息抓取并保存在本地数据库中

historySvc.js

  1. const level = require('level');
  2. // 用于产生精度大于 ms 的时间戳,确保唯一性
  3. const timestamp = require('monotonic-timestamp');
  4. // 用于处理 stream 的 JSON 对象
  5. const JSONStream = require('JSONStream');
  6. // 可通过 AMQP 协议连接 RabbitMQ
  7. const amqp = require('amqplib');
  8. // 数据库
  9. const db = level('./msgHistory');
  10. // 客户端请求时,将聊天记录序列化后返回
  11. require('http').createServer((req, res) => {
  12. res.writeHead(200);
  13. db.createValueStream()
  14. .pipe(JSONStream.stringify())
  15. .pipe(res);
  16. }).listen(8090);
  17. let channel, queue;
  18. amqp
  19. .connect('amqp://localhost') // [1]
  20. // 创建抽象的频道,类似于一个 session,用于维持通信状态
  21. .then(conn => conn.createChannel())
  22. .then(ch => {
  23. channel = ch;
  24. // 创建名为 chat 的 fanout exchange
  25. return channel.assertExchange('chat', 'fanout'); // [2]
  26. })
  27. // 创建名为 chat_history 的 queue,默认是持久化的
  28. .then(() => channel.assertQueue('chat_history')) // [3]
  29. .then((q) => {
  30. // q.queue 是 queue 的名字
  31. queue = q.queue;
  32. // 连接 queue 和 exchange
  33. return channel.bindQueue(queue, 'chat'); // [4]
  34. })
  35. .then(() => {
  36. // 监听来自 queue 的信息
  37. return channel.consume(queue, msg => { // [5]
  38. const content = msg.content.toString();
  39. console.log(`Saving message: ${content}`);
  40. // 存储进数据库
  41. db.put(timestamp(), content, err => {
  42. // 仅当存储成功后才确认(ack)这条信息
  43. // 如果broker没收到ack,这条信息就会留在 queue 里,等待再次被处理
  44. if (!err) channel.ack(msg);
  45. });
  46. });
  47. })
  48. .catch(err => console.log(err))
  49. ;

使用AMQP整合聊天应用

大部分和AMQP相关的设置于上面类似,下面只列出不同之处:
app.js

  1. // ...
  2. .then(() => {
  3. // 使用 exclusive queue 即可,可靠性由历史记录服务提供
  4. return channel.assertQueue(`chat_srv_${httpPort}`, {exclusive: true});
  5. })
  6. // ...
  7. ws.on('message', msg => {
  8. // 发布信息到 chat exchange,不带路由关键字,信息以 Buffer 的形式发送
  9. channel.publish('chat', '', new Buffer(msg));
  10. });
  11. // ...

可以看到,我们通过微服务的方式,实现了即使其中一个组件缺失了(历史记录服务),系统也能继续运作。实时聊天能继续进行,只是无法查阅历史记录罢了。

管道与任务分发模式(Pipelines and task distribution patterns)

在前面的章节中,我们学到了可以将耗时任务委派给多个本地的进程的完成,但它受限在单个机器的范围内。
本节中我们将在分布式架构中使用类似的模式,来使用在网络中任意位置的远程 workers 。
思路是有一个消息传递模式,允许我们将任务分发到多个机器上。其中任务既可以是单个完成的任务,也可以是将大任务分割,采取分而治之的方式完成:

显然在这种情况下,发布/订阅模式不再适用,因为我们并不想让一个任务被多个 worker 接收。我们需要一个类似于负载均衡的消息分发模式,通常被称为消费者竞争模式(competing consumers)

与HTTP负载均衡的一个最大不同,在于消费者竞争模式中的消费者更加地活跃,它是主动连接任务生产者来获取任务的。这种模式对可伸缩架构有着很大好处,因为它允许我们随意增加 worker 的数量,同时不需要修改生产者或者适配 service registry

通常,我们只需要一个单向异步通讯,它让我们能够建立更加复杂的处理架构,同时减去同步双向通讯的负担。同时还能带来更低的延迟和更高的吞吐量:

上图中,消息被分发,经过一组workers(fanout),往前通过一系列的处理单元,最终聚集到一个节点(fanin),通常称为 sink

ØMQ的扇入/扇出模式(The ØMQ fanout/fanin pattern)

PUSH/PULL 套接字

PUSH和PULL套接字有一些特性,使其十分适合用于建造单向通讯系统:
* 都能在 connect 或 bind 模式下工作。也就是说我们可以将 PUSH 套接字绑定(bind)在本地端口,监听来自 PULL 套接字的连接(connect);反过来,PULL 套接字也能监听来自 PUSH 套接字的连接。消息的流动方向只有从 PUSH -> PULL,但连接的发起者的不同。其中 bind 模式适合持久化的节点,而 connect 模式适合存活时间短的节点。以上节中的task worker为例,task worker 就是短期存在的,应该使用 connect 模式主动连接 task producer 和 sink(这两者则以bind模式保持监听状态)
* 多个 PULL 连接一个 PUSH,消息将平均分发,类似于负载均衡;多个 PUSH 连接一个 PULL,消息将按顺序轮流消费
* 若果一个PUSH套接字在仍没有PULL连接时就发出消息,消息也不会丢失,而是储存在队列中,直到有PULL节点上线。

用 ØMQ 搭建一个分布式的哈希和破解器(Building a distributed hashsum cracker with ØMQ)

破解方法简单粗暴:直接遍历所有的字母组合,计算哈希值并与目标值比较,若一致则找到答案。
这类问题属于易并行计算问题(embarrassingly parallel),子问题之间各自独立,十分适合PUSH/PULL模式。
该应用由三部分组成:variations generator(ventilator), worker, result collector(sink)

显然,generator 和 sink 都是持久化的(durable),应该采用 bind 模式;而 worker 数量不定,视需求灵活增减,应该主动拿任务并把结果主动发送给sink,因此应该采用 connect 模式。

实现ventilator

ventilator.js

  1. const zmq = require('zmq');
  2. const variationsStream = require('variations-stream');
  3. const alphabet = 'abcdefghijklmnopqrstuvwxyz';
  4. const batchSize = 10000;
  5. const maxLength = process.argv[2];
  6. const searchHash = process.argv[3];
  7. const ventilator = zmq.socket('push');
  8. ventilator.bindSync("tcp://*:5016");
  9. let batch = [];
  10. variationsStream(alphabet, maxLength)
  11. .on('data', combination => {
  12. batch.push(combination);
  13. // 凑够一批打包发送
  14. if (batch.length === batchSize) {
  15. const msg = {searchHash: searchHash, variations: batch};
  16. ventilator.send(JSON.stringify(msg));
  17. batch = [];
  18. }
  19. })
  20. .on('end', () => {
  21. // 发送剩余的部分
  22. const msg = {searchHash: searchHash, variations: batch};
  23. ventilator.send(JSON.stringify(msg));
  24. })
  25. ;

实现worker

  1. const zmq = require('zmq');
  2. const crypto = require('crypto');
  3. const fromVentilator = zmq.socket('pull');
  4. const toSink = zmq.socket('push');
  5. // 作为短暂存在的节点,主动连接两端
  6. fromVentilator.connect('tcp://localhost:5016');
  7. toSink.connect('tcp://localhost:5017');
  8. fromVentilator.on('message', buffer => {
  9. const msg = JSON.parse(buffer);
  10. const variations = msg.variations;
  11. variations.forEach( word => {
  12. console.log(`Processing: ${word}`);
  13. const shasum = crypto.createHash('sha1');
  14. shasum.update(word);
  15. const digest = shasum.digest('hex');
  16. if (digest === msg.searchHash) {
  17. console.log(`Found! => ${word}`);
  18. // 解出答案后发送给 sink
  19. toSink.send(`Found! ${digest} => ${word}`);
  20. }
  21. });
  22. });

通过这种方式,可以按需启动多个 worker,天然可伸缩

实现sink

  1. const zmq = require('zmq');
  2. const sink = zmq.socket('pull');
  3. sink.bindSync("tcp://*:5017");
  4. sink.on('message', buffer => {
  5. console.log('Message from worker: ', buffer.toString());
  6. });

AMQP中的管道和竞争性消费者(Pipelines and competing consumers in AMQP)

点对点通讯以及竞争性消费者

在AMQP中,为了确保每条消息都只由一个消费者接收,我们需要绕过 exchange,直接将消息发送到目标队列。这种通讯模式称为点对点通讯(point-to-point)
因为我们需要并行计算,利用AMQP中队列的特性,自然而然地就能想到将多个消费者监听同一个队列(这样消息就会平均分发)。这被称为竞争性消费者模式(competing consumers pattern)

使用AMQP实现哈希和破解器

简单来说,就是在每段通讯的路上加上队列(没有 exchange 和 binding):

代码部分和先前AMQP的代码基本相同,差别主要在于不需要创建 exchange 和 bind,直接调用channel.sendToQueue() API 即可

请求/回复模式(Request/reply patterns)

尽管单向通讯带来了高并行度和高效率,但它并不能解决所有的整合情况。有时候,一个合适的请求/回复模式能发挥不错的效果。

相关标识符(Correlation identifier)

这种方式其实非常常见。从最常见的TCP,到前端的JSONP的实现,都用到了这种技术。思路是发送信息时,带上一个唯一的标识(如uuid)。相应的回复返回时,带上这个标识,发送者就能识别出这个回复是对应哪个的消息的。

这种模式的一个特点,就是回复的顺序可以可请求的顺序不同(见上图)。

使用相关标识符实现请求/回复的抽象(Implementing a request/reply abstraction using correlation identifiers)

在第九章中,我们已经接触过进程间通讯的API了。
对于主进程:
* child.send(message)
* child.on(‘message’, callback)
对于子进程:
* process.send(message)
* process.on(‘message’, callback)

只需要对这个现成的通讯频道加以包装,就能实现我们的需求。

抽象请求

  1. const uuid = require('node-uuid');
  2. module.exports = channel => {
  3. const idToCallbackMap = {};
  4. // 如果接到的回复中的id有记录,调用相应的callback
  5. channel.on('message', message => {
  6. const handler = idToCallbackMap[message.inReplyTo];
  7. if(handler) {
  8. handler(message.data);
  9. }
  10. });
  11. // 返回一个抽象的请求体
  12. return function sendRequest(req, callback) {
  13. // 内部采用uuid,对外屏蔽
  14. const correlationId = uuid.v4();
  15. idToCallbackMap[correlationId] = callback;
  16. channel.send({
  17. type: 'request',
  18. data: req,
  19. id: correlationId
  20. });
  21. };
  22. };

抽象回复

  1. module.exports = channel =>
  2. { // 注册handler
  3. return function registerHandler(handler) {
  4. channel.on('message', message => {
  5. if (message.type !== 'request') return;
  6. // 要求handler将处理结果作为参数传递给第二个参数(callback)
  7. handler(message.data, reply => {
  8. channel.send({
  9. type: 'response',
  10. data: reply,
  11. inReplyTo: message.id
  12. });
  13. });
  14. });
  15. };
  16. };

包装回复者(replier)

  1. const reply = require('./reply')(process);
  2. reply((req, cb) => {
  3. setTimeout(() => {
  4. // 将结果传给cb
  5. cb({sum: req.a + req.b});
  6. }, req.delay);
  7. });

包装请求者(requestor.js)

  1. const replier = require('child_process')
  2. .fork(`${__dirname}/replier.js`);
  3. const request = require('./request')(replier);
  4. request({a: 1, b: 2, delay: 500}, res => {
  5. console.log('1 + 2 = ', res.sum);
  6. replier.disconnect();
  7. });
  8. request({a: 6, b: 1, delay: 100}, res => {
  9. console.log('6 + 1 = ', res.sum);
  10. });

返回地址(Return address)

如果在消息架构中有着不止一条频道或者队列,或者有着不止一个请求者,那么我们的架构就不足以应付了。在这种情况下,我们除了要有相关的id以外,还要知道返回的目标地址。

在AMQP中实现返回地址模式(Implementing the return address pattern in AMQP)

在AMQP中,返回地址就是请求者监听的回复队列的名字。因为回复队列是私有的,不在不同的消费者中共享,所以我们只需要一个短暂队列即可(transient queue)。

实现请求的抽象

创建队列时,不用指明队列的名字(这样会得到一个随机名)。同时,队列是 exclusive 的,在连接断开后就销毁。
amqpRequest.js

  1. const uuid = require('node-uuid');
  2. const amqp = require('amqplib');
  3. class AMQPRequest {
  4. constructor() {
  5. this.idToCallbackMap = {};
  6. }
  7. initialize() {
  8. return amqp
  9. .connect('amqp://localhost')
  10. .then(conn => conn.createChannel())
  11. .then(channel => {
  12. this.channel = channel;
  13. // 不指定名称,随机分配
  14. return channel.assertQueue('', {exclusive: true});
  15. })
  16. .then(q => {
  17. this.replyQueue = q.queue;
  18. return this._listenForResponses();
  19. })
  20. .catch(function(err) {
  21. console.log(err);
  22. })
  23. ;
  24. }
  25. _listenForResponses() {
  26. return this.channel.consume(this.replyQueue, msg => {
  27. const correlationId = msg.properties.correlationId;
  28. const handler = this.idToCallbackMap[correlationId];
  29. if (handler) {
  30. handler(JSON.parse(msg.content.toString()));
  31. }
  32. // exclusive,无需ack
  33. }, {noAck: true});
  34. }
  35. // 因为需要明确地址,所以参数加上了 queue
  36. request(queue, message, callback) {
  37. const id = uuid.v4();
  38. this.idToCallbackMap[id] = callback;
  39. this.channel.sendToQueue(queue,
  40. new Buffer(JSON.stringify(message)),
  41. {correlationId: id, replyTo: this.replyQueue}
  42. );
  43. }
  44. }
  45. module.exports = () => new AMQPRequest();

实现回复的抽象

amqpReply.js

  1. const amqp = require('amqplib');
  2. class AMQPReply {
  3. constructor(qName) {
  4. this.qName = qName;
  5. }
  6. initialize() {
  7. return amqp
  8. .connect('amqp://localhost')
  9. .then(conn => conn.createChannel())
  10. .then(channel => {
  11. this.channel = channel;
  12. return this.channel.assertQueue(this.qName);
  13. })
  14. .then(q => this.queue = q.queue)
  15. .catch(err => console.log(err.stack))
  16. ;
  17. }
  18. handleRequest(handler) {
  19. return this.channel.consume(this.queue, msg => {
  20. const content = JSON.parse(msg.content.toString());
  21. handler(content, reply => {
  22. // 将信息回复到指定的queue
  23. this.channel.sendToQueue(
  24. msg.properties.replyTo,
  25. new Buffer(JSON.stringify(reply)),
  26. {correlationId: msg.properties.correlationId}
  27. );
  28. this.channel.ack(msg);
  29. });
  30. });
  31. }
  32. }
  33. module.exports = (qName) => {
  34. return new AMQPReply(qName);
  35. };

实现请求者和回复者

和前面类似,稍加包装即可
replier.js

  1. const Reply = require('./amqpReply');
  2. // 指定 queue
  3. const reply = Reply('requests_queue');
  4. reply.initialize().then(() => {
  5. // 与前面同样的方式发起请求,内部细节被隐藏
  6. reply.handleRequest((req, cb) => {
  7. console.log('Request received', req);
  8. cb({sum: req.a + req.b});
  9. });
  10. });

requestor.js

  1. const req = require('./amqpRequest')();
  2. req.initialize().then(() => {
  3. for (let i = 100; i > 0; i--) {
  4. sendRandomRequest();
  5. }
  6. });
  7. function sendRandomRequest() {
  8. const a = Math.round(Math.random() * 100);
  9. const b = Math.round(Math.random() * 100);
  10. req.request('requests_queue', {a: a, b: b},
  11. res => {
  12. console.log(`${a} + ${b} = ${res.sum}`);
  13. }
  14. );
  15. }

至此我们就实现了一个可靠的请求者和回复者,信息不回被丢失。同时,因为使用了AMQP的缘故,我们的回复者已经是开箱即用的可伸缩了。运行多个replier时,因为在监听同一个队列,代理将会通过负载均衡的形式分发信息。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注