@ironzhang
2017-06-21T06:15:34.000000Z
字数 3640
阅读 339
技术文章
mqtt-locater在内存中存储client到broker的映射。
Path: /v1/publish
Verb: POST
Content-Type: application/json
Body: {
"Topic": "1",
"Qos": 2,
"Payload": "cGF5bG9hZA=="
}
Path: /v1/online/:identifier
Verb: GET
Content-Type: application/json
Response: {
"Online": false
}
二次开发接口路由配置
[
{
"Name": "auth-svr",
"Balancer": "round-robin",
"Addrs": [
"http://localhost:8080",
"http://127.0.0.1:8080"
],
"APIs": [
{
"Name": "Authenticate",
"Method": "",
"Path": "/v1/authenticate"
},
{
"Name": "CanPublish",
"Method": "",
"Path": "/v1/can/publish"
},
{
"Name": "CanSubscribe",
"Method": "",
"Path": "/v1/can/subscribe"
}
]
},
{
"Name": "online-svr",
"Balancer": "sharding",
"Addrs": [
"http://localhost:8080",
"http://127.0.0.1:8080"
],
"APIs": [
{
"Name": "OnConnect",
"Method": "POST",
"Path": "/v1/on/connect"
},
{
"Name": "OnDisconnect",
"Method": "POST",
"Path": "/v1/on/disconnect"
}
]
},
{
"Name": "process-svr",
"Balancer": "round-robin",
"Addrs": [
"http://localhost:8080",
"http://127.0.0.1:8080"
],
"APIs": [
{
"Name": "OnPublish",
"Method": "POST",
"Path": "/v1/on/publish"
},
{
"Name": "OnSubscribe",
"Method": "POST",
"Path": "/v1/on/subscribe"
},
{
"Name": "OnUnsubscribe",
"Method": "POST",
"Path": "/v1/on/unsubscribe"
}
]
}
]
一个连接两个协程
Publish消息的下发主要由以下两个组件来处理:
队列满时消息的丢弃顺序:先丢弃最早进入队列的Qos0消息,如果没有Qos0消息,则丢弃最早进入的Qos1/Qos2消息。
// Publish 发布消息
func (m *Manager) Publish(p *packets.PublishPacket) error {
req := &proto.Transport{
Topic: p.TopicName,
Qos: p.FixedHeader.Qos,
Payload: p.Payload,
}
for _, peer := range m.GetPeers() {
if peer == m.cluster.self {
continue
}
c, err := m.pool.GetClient("tcp", peer)
if err != nil {
log.Errorw("get client", "error", err)
continue
}
if err = apis.Transport(c, req); err != nil {
log.Errorw("transport", "error", err)
continue
}
}
return m.Transport(req)
}
50W连接,未接入mqtt-backend
/ | QPS | Latency |
---|---|---|
Connect | 9099.8029 | 99% in 10.474612ms |
Subscribe | 14940.2246 | 99% in 4.405788ms |
Publish,qos=1 | 10841.0873 | 99% in 6.163924ms |
http.Publish,qos=1 | 13097.6933 | 99% in 22.405986ms |
40W连接,接入mqtt-backend
/ | QPS | Latency |
---|---|---|
Connect | 3874.4689 | 99% in 37.448057ms |
Subscribe | 2862.341 | 99% in 60.228417ms |
Publish,qos=1 | 2089.3266 | 99% in 122.715278ms |
http.Publish,qos=1 | 13708.1291 | 99% in 19.762739ms |
3台机器,105W连接,优化前
/ | QPS | Latency |
---|---|---|
Connect | 6292.7638 | 99% in 198.773964ms |
Subscribe | 15675.9521 | 99% in 4.835648ms |
Publish | 10047.7267 | 99% in 713.531µs |
2台机器,90W连接,优化后
/ | QPS | Latency |
---|---|---|
Connect | 7466.5403 | 99% in 18.433156ms |
Subscribe | 15625.0389 | 99% in 4.986701ms |
Publish,qos=1 | 8458.7413 | 99% in 10.874216ms |