[关闭]
@elibinary 2016-07-02T02:50:09.000000Z 字数 2186 阅读 1009

Rails上实现即时消息通讯之 MessageBus

未分类


Rails5 已经release啦,各种新玩意真是令人兴奋。就比如说今天要说的这个即时消息通讯,看到这个就会想到 websocket 。没错,rails5 中也集成了 websocket 的解决方案,就是 ActionCable 这个东西,不过本篇文章暂不介绍 ActionCable 。

本篇来说一下以前的一些解决方案,比如说 MessageBus

使用

MessageBus在后端提供了一种pub/sub的机制,同时在js端也做了封装

首先引入

  1. gem 'message_bus'

在application.js中

  1. //= require message-bus

主要的工作方式:
在服务器端可以通过 publish 方法写入信息

  1. MessageBus.publish "/channel", data

其中 "/channel" 是频道名字,然后client端通过订阅不同频道来获取想要的信息。

  1. MessageBus.start();
  2. MessageBus.callbackInterval = 500;
  3. MessageBus.subscribe("/channel", function(data){
  4. $('#messages').append("<p>"+ data + "</p>");
  5. $(document.body).scrollTop(document.body.scrollHeight);
  6. });

其中 callbackInterval 用来设置轮询的时间间隔,单位是 ms。

深入源码

先来看一下 MessageBus 到底把信息存储在了什么地方,在 backends 文件夹下我们可以看到 MessageBus 支持三中存储方式分别是:in-memory, PostgreSQL 和 Redis。可以通过以下方式设置:

  1. MessageBus.configure(backend: :redis)

在没有设置的情况下默认使用 Redis ,这点从源码中可看出:

  1. # lib/message_bus.rb
  2. def backend
  3. @config[:backend] || :redis
  4. end

我们来看一下它的工作流程,(以 reids 为例)
当发布一条信息时,调用 lib/message_bus.rb 的 publish 方法

  1. def publish(channel, data, opts = nil)
  2. ......
  3. ......
  4. encoded_data = JSON.dump({
  5. data: data,
  6. user_ids: user_ids,
  7. group_ids: group_ids,
  8. client_ids: client_ids
  9. })
  10. reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
  11. end

这个方法最后会通过 backend 的实例(这里是redis)去执行对应的 publish 方法

  1. # lib/message_bus/backends/redis.rb
  2. def publish(channel, data, queue_in_memory=true)
  3. ......
  4. ......
  5. redis.multi do |m|
  6. redis.zadd backlog_key, backlog_id, payload
  7. redis.expire backlog_key, @max_backlog_age
  8. redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
  9. redis.expire global_backlog_key, @max_backlog_age
  10. redis.publish redis_channel_name, payload
  11. if backlog_id > @max_backlog_size
  12. redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
  13. end
  14. if global_id > @max_global_backlog_size
  15. redis.zremrangebyscore global_backlog_key, 1, global_id - @max_global_backlog_size
  16. end
  17. end
  18. ......
  19. end

可以看到信息在redis中的存储结构是 SortedSet

拉取消息的实现可以看 lib/message_bus/assets/message-bus.js

  1. poll = function() {
  2. var data;
  3. if(stopped) {
  4. return;
  5. }
  6. if (callbacks.length === 0) {
  7. if(!delayPollTimeout) {
  8. delayPollTimeout = setTimeout(function(){ delayPollTimeout = null; poll();}, 500);
  9. }
  10. return;
  11. }
  12. data = {};
  13. for (var i=0;i<callbacks.length;i++) {
  14. data[callbacks[i].channel] = callbacks[i].last_id;
  15. }
  16. me.longPoll = longPoller(poll,data);
  17. };

其中订阅频道获取频道信息的核心就是轮询

本篇文章先介绍到这里,关于 MessageBus 中的很多细节实现可以去研读其源码找到解答

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注