[关闭]
@ironzhang 2017-06-21T06:15:34.000000Z 字数 3640 阅读 339

MQTT分享

技术文章


架构

mqtt-locater在内存中存储client到broker的映射。

接口

对外开放接口

  1. Path: /v1/publish
  2. Verb: POST
  3. Content-Type: application/json
  4. Body: {
  5. "Topic": "1",
  6. "Qos": 2,
  7. "Payload": "cGF5bG9hZA=="
  8. }
  1. Path: /v1/online/:identifier
  2. Verb: GET
  3. Content-Type: application/json
  4. Response: {
  5. "Online": false
  6. }

二次开发接口

二次开发接口路由配置

  1. [
  2. {
  3. "Name": "auth-svr",
  4. "Balancer": "round-robin",
  5. "Addrs": [
  6. "http://localhost:8080",
  7. "http://127.0.0.1:8080"
  8. ],
  9. "APIs": [
  10. {
  11. "Name": "Authenticate",
  12. "Method": "",
  13. "Path": "/v1/authenticate"
  14. },
  15. {
  16. "Name": "CanPublish",
  17. "Method": "",
  18. "Path": "/v1/can/publish"
  19. },
  20. {
  21. "Name": "CanSubscribe",
  22. "Method": "",
  23. "Path": "/v1/can/subscribe"
  24. }
  25. ]
  26. },
  27. {
  28. "Name": "online-svr",
  29. "Balancer": "sharding",
  30. "Addrs": [
  31. "http://localhost:8080",
  32. "http://127.0.0.1:8080"
  33. ],
  34. "APIs": [
  35. {
  36. "Name": "OnConnect",
  37. "Method": "POST",
  38. "Path": "/v1/on/connect"
  39. },
  40. {
  41. "Name": "OnDisconnect",
  42. "Method": "POST",
  43. "Path": "/v1/on/disconnect"
  44. }
  45. ]
  46. },
  47. {
  48. "Name": "process-svr",
  49. "Balancer": "round-robin",
  50. "Addrs": [
  51. "http://localhost:8080",
  52. "http://127.0.0.1:8080"
  53. ],
  54. "APIs": [
  55. {
  56. "Name": "OnPublish",
  57. "Method": "POST",
  58. "Path": "/v1/on/publish"
  59. },
  60. {
  61. "Name": "OnSubscribe",
  62. "Method": "POST",
  63. "Path": "/v1/on/subscribe"
  64. },
  65. {
  66. "Name": "OnUnsubscribe",
  67. "Method": "POST",
  68. "Path": "/v1/on/unsubscribe"
  69. }
  70. ]
  71. }
  72. ]

实现

MQTT后台架构设计

协程模型

一个连接两个协程

Publish消息下发

Publish消息的下发主要由以下两个组件来处理:

队列满时消息的丢弃顺序:先丢弃最早进入队列的Qos0消息,如果没有Qos0消息,则丢弃最早进入的Qos1/Qos2消息。

集群广播

Created with Raphaël 2.1.2client 1client 1mqtt_broker 1mqtt_broker 1other mqtt_brokersother mqtt_brokersclientsclients发布消息透传消息给其他mqtt_broker根据发布主题找到订阅该主题的clients推送消息
  1. // Publish 发布消息
  2. func (m *Manager) Publish(p *packets.PublishPacket) error {
  3. req := &proto.Transport{
  4. Topic: p.TopicName,
  5. Qos: p.FixedHeader.Qos,
  6. Payload: p.Payload,
  7. }
  8. for _, peer := range m.GetPeers() {
  9. if peer == m.cluster.self {
  10. continue
  11. }
  12. c, err := m.pool.GetClient("tcp", peer)
  13. if err != nil {
  14. log.Errorw("get client", "error", err)
  15. continue
  16. }
  17. if err = apis.Transport(c, req); err != nil {
  18. log.Errorw("transport", "error", err)
  19. continue
  20. }
  21. }
  22. return m.Transport(req)
  23. }

性能

50W连接,未接入mqtt-backend

/ QPS Latency
Connect 9099.8029 99% in 10.474612ms
Subscribe 14940.2246 99% in 4.405788ms
Publish,qos=1 10841.0873 99% in 6.163924ms
http.Publish,qos=1 13097.6933 99% in 22.405986ms

40W连接,接入mqtt-backend

/ QPS Latency
Connect 3874.4689 99% in 37.448057ms
Subscribe 2862.341 99% in 60.228417ms
Publish,qos=1 2089.3266 99% in 122.715278ms
http.Publish,qos=1 13708.1291 99% in 19.762739ms

3台机器,105W连接,优化前

/ QPS Latency
Connect 6292.7638 99% in 198.773964ms
Subscribe 15675.9521 99% in 4.835648ms
Publish 10047.7267 99% in 713.531µs

2台机器,90W连接,优化后

/ QPS Latency
Connect 7466.5403 99% in 18.433156ms
Subscribe 15625.0389 99% in 4.986701ms
Publish,qos=1 8458.7413 99% in 10.874216ms

问题

后续

业务系统

IM系统

Created with Raphaël 2.1.2c1c1mqtt_broker_clustermqtt_broker_clustermqtt_backend_clustermqtt_backend_clusterc2c2PUBLISH topic=ablecloud, payload={c2,msg}转发消息到后台系统处理消息,存储payload.msg到c2的接收消息队列中PUBLISH topic=c1, payload={业务层ack}转发消息到c1PUBLISH topic=c2, payload={通知客户端有新消息}转发消息到c2
Created with Raphaël 2.1.2c2c2mqtt_broker_clustermqtt_broker_clustermqtt_backend_clustermqtt_backend_clusterPUBLISH topic=ablecloud, payload={cli_seq_no}转发消息到后台系统处理消息,将c2的接收消息队列中seq_no大于cli_seq_no的消息发送给c2PUBLISH topic=c2, payload={msgs}转发消息到c2更新本地cli_seq_noPUBLISH topic=ablecloud, payload={cli_seq_no}转发消息到后台系统处理消息,更新服务端的c2的cli_seq_no,可删除消息队列中seq_no小于等于cli_seq_no的消息
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注