[关闭]
@ironzhang 2017-07-18T07:25:01.000000Z 字数 1987 阅读 614

MQTT后台架构设计

工作/mqtt


目标

基于MQTT-3.1.1协议,结合我们的业务需求实现一个不完备的MQTT服务器,从方便实现的角度考虑,初期版本我们对MQTT协议做出如下裁剪:

  1. 不支持主题通配符(通配符当做普通字符处理)
  2. 不支持遗愿消息(忽略连接包中的遗愿消息)
  3. 不支持保留消息(忽略PUBLISH消息中的RETAIN标志,当做普通的PUBLISH消息处理)

订阅主题不支持Qos1和QoS2(Qos1和Qos2的订阅都降为Qos0)?

结合我们的业务,我们还需额外支持如下功能:

  1. 客户端认证
  2. 订阅、发布权限检查
  3. 支持设备消息上报UDS

设计

设计原则

为解耦我们的业务逻辑和MQTT的实现,我们将MQTT实现为一个库,上层通过实现MQTT库定义的相关接口来实现具体的客户端认证,订阅、发布权限检查,设备消息上报等业务逻辑功能。

一旦服务器重启,该服务器上的所有会话状态都将丢失。

协议解析

采用第三方库:https://github.com/eclipse/paho.mqtt.golang

连接层模型

连接层的职责:

  1. MQTT协议报文的读取和写入
  2. MQTT协议报文的处理

一个goroutine模型,负责读取、处理、写入

先采用这种简单的模型,后续性能调优阶段如果发现性能瓶颈在这里再改进该模型。
需要注意goroutine的回收处理。

会话管理

为例提供服务质量保证,服务器有必要存储会话状态(不持久化),一个会话的生存时间至少和它的网络连接是一样的。

会话的职责:

  1. 缓存客户端的全部订阅
  2. 处理Publish消息的下发

受限于内存资源,后台不会一直持有离线会话,当会话离线时间超过阀值后,会话状态将被丢弃。

会话通过一个内存消息队列飞行窗口处理下发消息,飞行窗口保存当前正在发送未确认的QoS1/QoS2消息,当客户端离线或飞行窗口满时,消息缓存到队列。如果消息队列满,先丢弃QoS0消息或最早进入队列的消息。

MQTT解决的是在网络不稳定、瞬断等情况下的消息可靠性。100%可靠的离线消息,还是由其他系统实现比较好。

主题管理

主题管理负责维护从主题到会话的订阅关系。由于我们去掉了对主题通配符的支持,因此只要维护一个简单的map来管理这种映射关系即可。

服务质量和协议流程

QoS 0: 最多分发一次

Created with Raphaël 2.1.2发送者发送者接收者接收者PUBLISH: QoS 0, DUP=0开始分发应用消息给后续的接收者

QoS 1: 最少分发一次

Created with Raphaël 2.1.2发送者发送者接收者接收者存储消息PUBLISH: QoS 1, DUP=0, MessageID开始分发应用消息给后续的接收者PUBACK, MessageID丢弃消息

不要求接收者在发送PUBACK之前完成应用消息的分发。

QoS 2: 仅分发一次

Created with Raphaël 2.1.2发送者发送者接收者接收者存储消息PUBLISH: QoS 2, DUP=0, MessageID存储MessageID, 开始分发应用消息给后续的接收者PUBREC, MessageID丢弃消息, 存储MessageIDPUBREL: MessageID丢弃MessageIDPUBCOMP, MessageID回收MessageID

客户端认证机制和访问控制机制

开放认证接口和访问控制检查接口,由外部实现

  1. // Authenticator 客户端认证接口
  2. type Authenticator interface {
  3. Authenticate(username string, password []byte) (ok bool, err error)
  4. }
  5. // Accessor 访问权限检查接口
  6. type Accessor interface {
  7. CanPublish(username string, topic string) bool
  8. CanSubscribe(username string, topic string) bool
  9. }

钩子机制

当发生连接、断开连接、推送消息、订阅、取消订阅这几个事件时,都会调用相应的钩子函数通知上层。

  1. // Hook 钩子函数
  2. type Hook struct {
  3. OnConnect func(c Conn)
  4. OnDisconnect func(c Conn)
  5. OnPing func(c Conn)
  6. OnPublish func(c Conn, topic string, payload []byte)
  7. OnSubscribe func(c Conn, topic string)
  8. OnUnsubscribe func(c Conn, topic string)
  9. }

集群方案

https://www.zybuluo.com/ironzhang/note/513578

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注