@ironzhang
2017-07-18T07:25:01.000000Z
字数 1987
阅读 614
工作/mqtt
基于MQTT-3.1.1协议,结合我们的业务需求实现一个不完备的MQTT服务器,从方便实现的角度考虑,初期版本我们对MQTT协议做出如下裁剪:
订阅主题不支持Qos1和QoS2(Qos1和Qos2的订阅都降为Qos0)?
结合我们的业务,我们还需额外支持如下功能:
为解耦我们的业务逻辑和MQTT的实现,我们将MQTT实现为一个库,上层通过实现MQTT库定义的相关接口来实现具体的客户端认证,订阅、发布权限检查,设备消息上报等业务逻辑功能。
一旦服务器重启,该服务器上的所有会话状态都将丢失。
采用第三方库:https://github.com/eclipse/paho.mqtt.golang
连接层的职责:
一个goroutine模型,负责读取、处理、写入
先采用这种简单的模型,后续性能调优阶段如果发现性能瓶颈在这里再改进该模型。
需要注意goroutine的回收处理。
为例提供服务质量保证,服务器有必要存储会话状态(不持久化),一个会话的生存时间至少和它的网络连接是一样的。
会话的职责:
受限于内存资源,后台不会一直持有离线会话,当会话离线时间超过阀值后,会话状态将被丢弃。
会话通过一个内存消息队列和飞行窗口处理下发消息,飞行窗口保存当前正在发送未确认的QoS1/QoS2消息,当客户端离线或飞行窗口满时,消息缓存到队列。如果消息队列满,先丢弃QoS0消息或最早进入队列的消息。
MQTT解决的是在网络不稳定、瞬断等情况下的消息可靠性。100%可靠的离线消息,还是由其他系统实现比较好。
主题管理负责维护从主题到会话的订阅关系。由于我们去掉了对主题通配符的支持,因此只要维护一个简单的map来管理这种映射关系即可。
QoS 0: 最多分发一次
QoS 1: 最少分发一次
不要求接收者在发送PUBACK之前完成应用消息的分发。
QoS 2: 仅分发一次
开放认证接口和访问控制检查接口,由外部实现
// Authenticator 客户端认证接口
type Authenticator interface {
Authenticate(username string, password []byte) (ok bool, err error)
}
// Accessor 访问权限检查接口
type Accessor interface {
CanPublish(username string, topic string) bool
CanSubscribe(username string, topic string) bool
}
当发生连接、断开连接、推送消息、订阅、取消订阅这几个事件时,都会调用相应的钩子函数通知上层。
// Hook 钩子函数
type Hook struct {
OnConnect func(c Conn)
OnDisconnect func(c Conn)
OnPing func(c Conn)
OnPublish func(c Conn, topic string, payload []byte)
OnSubscribe func(c Conn, topic string)
OnUnsubscribe func(c Conn, topic string)
}