[关闭]
@wsd1 2017-01-19T07:58:35.000000Z 字数 6847 阅读 4973

【专题3】skynet中watchdog,gate,agent调用关系清理(近代版)

skynet 201611


参考文档

http://cloudwu.github.io/lua53doc/manual.html#6.4.1

https://github.com/cloudwu/skynet/wiki

背景

gate服务与 socket节点交互 处理tcp接入及其 长度+数据 格式的数据帧。细节可以参见“【专题1】skynet之lua版本的gate_service分析”。

其作为一个前端通用型界面,需要和其他部分进行交互和安排数据流向,本文分析 与该服务交互的 watchdog和agent,厘清相互关系。

同时可以参考另一篇文章“【专题3】skynet中watchdog,gate,agent调用关系清理(早期json版)”

需要区分开 lualib/snax/gateserver.lua 和 service/gate.lua,

main.lua引导 watchdog并执行其start命令

在 引导文件 main.lua 文件中:

  1. local watchdog = skynet.newservice("watchdog")
  2. skynet.call(watchdog, "lua", "start", {
  3. port = 8888,
  4. maxclient = max_client,
  5. nodelay = true,
  6. })

引导watchdog,然后调用其 start 接口。

watchdog.lua:

  1. ---最重要过程,安排了lua接口
  2. skynet.start(function()
  3. skynet.dispatch("lua", function(session, source, cmd, subcmd, ...)
  4. if cmd == "socket" then
  5. local f = SOCKET[subcmd]
  6. f(...)
  7. -- socket api dont need return
  8. else
  9. local f = assert(CMD[cmd])
  10. skynet.ret(skynet.pack(f(subcmd, ...)))
  11. end
  12. end)
  13. gate = skynet.newservice("gate") --- 注意这里引导了 gate
  14. end)

main.lua 中 调用了 start接口,相应代码如下:

watchdog.lua:

  1. function CMD.start(conf)
  2. skynet.call(gate, "lua", "open" , conf)
  3. end
  4. ---对应dispatch中的部分:
  5. ...
  6. local f = assert(CMD[cmd])
  7. skynet.ret(skynet.pack(f(subcmd, ...)))
  8. ...

可见,watchdog一被main.lua 引导就间接发出对gate的open指令,我们去探究gate.lua:

watchdog 被动调用了 gate.lua 或 gateserver.lua的open命令

gate.lua

  1. local gateserver = require "snax.gateserver"
  2. ...
  3. gateserver.start(handler)

gate.lua其实很简单,就是组织一系列handler,然后 gateserver.start一下。所以,想要看“ skynet.call(gate, "lua", "open" , conf)”执行结果,需要看 gateserver.lua:

gateserver.lua:

  1. function gateserver.start(handler)
  2. ...
  3. --这里是处理lua类型消息的地方
  4. skynet.start(function()
  5. skynet.dispatch("lua", function (_, address, cmd, ...)
  6. local f = CMD[cmd] -- <-- 处理指令
  7. if f then
  8. skynet.ret(skynet.pack(f(address, ...)))
  9. else
  10. skynet.ret(skynet.pack(handler.command(cmd, address, ...)))
  11. end
  12. end)
  13. end)
  14. ...
  15. ---open指令
  16. function CMD.open( source, conf ) --网络相关设置在这里被应用
  17. assert(not socket)
  18. local address = conf.address or "0.0.0.0"
  19. local port = assert(conf.port)
  20. maxclient = conf.maxclient or 1024
  21. nodelay = conf.nodelay
  22. skynet.error(string.format("Listen on %s:%d", address, port))
  23. -- 真正处理网络部分。socketdriver就是处理socket的服务节点 lua-socket.c
  24. socket = socketdriver.listen(address, port)
  25. socketdriver.start(socket)
  26. ---如果上层应用有open指令,那么执行之
  27. if handler.open then
  28. return handler.open(source, conf)
  29. end
  30. end
  31. ...
  32. end

可以想见,gateserver.lua 会全权处理好对socket的控制,socket的消息,比如tcp接入、报文收取什么的都是有socket类型的消息处理环节:

gateserver.lua:

  1. function gateserver.start(handler)
  2. ...
  3. --这就是处理socket消息的地方
  4. --socket消息被netpack中间件filter()处理转换为其他消息类型
  5. skynet.register_protocol {
  6. name = "socket",
  7. id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
  8. unpack = function ( msg, sz )
  9. return netpack.filter( queue, msg, sz)
  10. end,
  11. dispatch = function (_, _, q, type, ...)
  12. queue = q
  13. if type then
  14. MSG[type](...)
  15. end
  16. end
  17. }
  18. ...
  19. local MSG = {}
  20. --收到数据的消息处理,注意是已经去掉长度头部组装好的数据帧
  21. local function dispatch_msg(fd, msg, sz)
  22. if connection[fd] then
  23. handler.message(fd, msg, sz) --这里就是调用gate.lua中安装的handler
  24. else
  25. skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
  26. end
  27. end
  28. MSG.data = dispatch_msg
  29. ...
  30. --处理接入,accpet之后被触发
  31. function MSG.open(fd, msg)
  32. if client_number >= maxclient then
  33. socketdriver.close(fd)
  34. return
  35. end
  36. if nodelay then
  37. socketdriver.nodelay(fd)
  38. end
  39. connection[fd] = true
  40. client_number = client_number + 1
  41. handler.connect(fd, msg)--这里就是调用gate.lua中安装的handler
  42. end
  43. ...
  44. --处理接入,断开之后被触发
  45. function MSG.close(fd)
  46. if fd ~= socket then
  47. if handler.disconnect then
  48. handler.disconnect(fd)--这里就是调用gate.lua中安装的handler
  49. end
  50. close_fd(fd)
  51. else
  52. socket = nil
  53. end
  54. end
  55. ...
  56. end

上述代码有点小背景,想要搞清楚可以看看“【专题1】skynet之lua版本的gate_service分析”。
上面处理 类如data、open、close 什么的消息,其实是来自于 netpack.filter中间件对socket消息的预处理。想搞清楚就看上面文档。

gate处理接入会需要watchdog的帮助

由于gateserver.lua仅处理框架层面的事务,实际处理逻辑还是要看gate.lua

gate.lua:

  1. --收到数据帧
  2. function handler.message(fd, msg, sz)
  3. -- recv a package, forward it
  4. local c = connection[fd]
  5. local agent = c.agent
  6. if agent then
  7. skynet.redirect(agent, c.client, "client", 0, msg, sz)
  8. else
  9. skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
  10. end
  11. end
  12. -- 上面的代码是收到数据时的逻辑,在没有介绍的部位 gate 其实可以安装agent来处理数据业务,,上面redirect就是这样的路径。
  13. 否则,gate会将该消息转移给 watchdog
  14. function handler.connect(fd, addr)
  15. local c = {
  16. fd = fd,
  17. ip = addr,
  18. }
  19. connection[fd] = c
  20. skynet.send(watchdog, "lua", "socket", "open", fd, addr)
  21. end
  22. -- 上面的代码是建立连接的逻辑,可见,gate会将该消息转移给 watchdog
  23. function handler.disconnect(fd)
  24. close_fd(fd)
  25. skynet.send(watchdog, "lua", "socket", "close", fd)
  26. end
  27. -- 上面的代码是连接断开的逻辑,gate会将该消息转移给 watchdog

gate服务节点会密切保持和watchdog间的联系,不管接入还是断开,都会通知watchdog。我们看看这部分看门狗如何处理。

watchdog.lua:

  1. --下述代码,在gate在收到接入事件时被传递触发
  2. -- 看门狗建立了agent并通过start指令将所有信息转交给它
  3. function SOCKET.open(fd, addr)
  4. skynet.error("New client from : " .. addr)
  5. agent[fd] = skynet.newservice("agent")
  6. skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
  7. end
  8. ...
  9. --下述代码,在gate在收到断开事件时被传递触发
  10. function SOCKET.close(fd)
  11. print("socket close",fd)
  12. close_agent(fd)
  13. end
  14. ...
  15. --下述代码,在gate在收到数据时被传递触发
  16. function SOCKET.data(fd, msg)
  17. end

可见,看门狗在安排好gate之后,还要负责为gate介绍agent来处理数据。gate运行起来后有个什么事情都会告诉看门狗,尤其是数据业务,后者会为其创建agent。

watchdog为gate的连接创建agent 并start之

我们看看agent如何和gate配合。

从watchdog引导agent开始:

agent[fd] = skynet.newservice("agent")
skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })

agent.lua:

  1. ...
  2. ---这里就是看门狗在创建agent之后,第一个发布
  3. ---skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
  4. ---指令的地方
  5. function CMD.start(conf)
  6. --取出参数啥的
  7. local fd = conf.client
  8. local gate = conf.gate
  9. WATCHDOG = conf.watchdog
  10. -- 下面的部分就有点看不明白了,应该是sproto相关的初始化
  11. -- slot 1,2 set at main.lua
  12. host = sprotoloader.load(1):host "package"
  13. send_request = host:attach(sprotoloader.load(2))
  14. skynet.fork(function()
  15. while true do
  16. send_package(send_request "heartbeat")
  17. skynet.sleep(500)
  18. end
  19. end)
  20. -- 注意,这里的fd就是socket服务节点连接句柄,可以直接使用socket api来访问的
  21. client_fd = fd
  22. --关键: agent还向gate发布了 forward指令
  23. skynet.call(gate, "lua", "forward", fd)
  24. end
  25. skynet.start(function()
  26. skynet.dispatch("lua", function(_,_, command, ...)
  27. local f = CMD[command]
  28. skynet.ret(skynet.pack(f(...)))
  29. end)
  30. end)

上面可以看到,看门狗除了引导agent之外,还执行了它的start指令,其中agent执行了 gate的forward指令。

gate.lua

  1. function CMD.forward(source, fd, client, address)
  2. local c = assert(connection[fd])
  3. unforward(c)
  4. c.client = client or 0
  5. c.agent = address or source
  6. forwarding[c.agent] = c
  7. gateserver.openclient(fd)
  8. end

上面的代码很简单,gate就是记录下connection对应的agent之类的信息。

再回到gate看看数据转移的代码,gate中很明白的指示了向agent发送数据:
gate.lua:

  1. --收到数据帧
  2. function handler.message(fd, msg, sz)
  3. -- recv a package, forward it
  4. local c = connection[fd]
  5. local agent = c.agent
  6. if agent then
  7. -- 将数据发送给agent
  8. skynet.redirect(agent, c.client, "client", 0, msg, sz)
  9. else
  10. skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
  11. end
  12. end

复习 redirect 用法:

skynet.redirect(address, source, typename, session, ...) 它和 skynet.send 功能类似,但更细节一些。它可以指定发送地址(把消息源伪装成另一个服务),指定发送的消息的 session 。注:address 和 source 都必须是数字地址,不可以是别名。skynet.redirect 不会调用 pack ,所以这里的 ... 必须是一个编码过的字符串,或是 userdata 加一个长度。

总结

基本到这里就已经比较清楚了,不去分析 agent如何用sproto格式处理数据细节,调用关系如下:

main.lua 首先引导watchdog.lua,使用网络配置参数调用其 start 接口。(newservice + start 是skynet中常用的组合)

看门狗启动时引导 gate.lua,main.lua对前者start的调用中又调用了gate的open命令,在gateserver.lua模板中启动socketdriver.listen和start。至此网络接口打开等候连接进入,引导过程完成。

gate.lua是gateserver.lua的应用,后面一旦有连接进来,都会执行gate.lua设置的handler,其中接入过程,会在gate.lua中通知回watchdog,后者就新建一个agent,并start之。
agent引导起来后,start过程中会主动调用gate.lua的forward接口,为其安装agent相关信息,再之后的过程就是 gate.lua中接收到数据都会redirect到agent了。

分析完成。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注