@wsd1
2017-01-19T07:58:35.000000Z
字数 6847
阅读 4682
skynet
201611
http://cloudwu.github.io/lua53doc/manual.html#6.4.1
https://github.com/cloudwu/skynet/wiki
gate服务与 socket节点交互 处理tcp接入及其 长度+数据 格式的数据帧。细节可以参见“【专题1】skynet之lua版本的gate_service分析”。
其作为一个前端通用型界面,需要和其他部分进行交互和安排数据流向,本文分析 与该服务交互的 watchdog和agent,厘清相互关系。
同时可以参考另一篇文章“【专题3】skynet中watchdog,gate,agent调用关系清理(早期json版)”
需要区分开 lualib/snax/gateserver.lua 和 service/gate.lua,
lualib/snax/gateserver.lua
是gate模板程序是与socket直接交互的底层公共部分,上层只需要安装一系列handler就可以使用。
service/gate.lua
是对gateserver的实际应用。
在 引导文件 main.lua 文件中:
local watchdog = skynet.newservice("watchdog")
skynet.call(watchdog, "lua", "start", {
port = 8888,
maxclient = max_client,
nodelay = true,
})
引导watchdog,然后调用其 start 接口。
watchdog.lua:
---最重要过程,安排了lua接口
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, subcmd, ...)
if cmd == "socket" then
local f = SOCKET[subcmd]
f(...)
-- socket api dont need return
else
local f = assert(CMD[cmd])
skynet.ret(skynet.pack(f(subcmd, ...)))
end
end)
gate = skynet.newservice("gate") --- 注意这里引导了 gate
end)
main.lua 中 调用了 start接口,相应代码如下:
watchdog.lua:
function CMD.start(conf)
skynet.call(gate, "lua", "open" , conf)
end
---对应dispatch中的部分:
...
local f = assert(CMD[cmd])
skynet.ret(skynet.pack(f(subcmd, ...)))
...
可见,watchdog一被main.lua 引导就间接发出对gate的open指令,我们去探究gate.lua:
gate.lua
local gateserver = require "snax.gateserver"
...
gateserver.start(handler)
gate.lua其实很简单,就是组织一系列handler,然后 gateserver.start一下。所以,想要看“ skynet.call(gate, "lua", "open" , conf)”执行结果,需要看 gateserver.lua:
gateserver.lua:
function gateserver.start(handler)
...
--这里是处理lua类型消息的地方
skynet.start(function()
skynet.dispatch("lua", function (_, address, cmd, ...)
local f = CMD[cmd] -- <-- 处理指令
if f then
skynet.ret(skynet.pack(f(address, ...)))
else
skynet.ret(skynet.pack(handler.command(cmd, address, ...)))
end
end)
end)
...
---open指令
function CMD.open( source, conf ) --网络相关设置在这里被应用
assert(not socket)
local address = conf.address or "0.0.0.0"
local port = assert(conf.port)
maxclient = conf.maxclient or 1024
nodelay = conf.nodelay
skynet.error(string.format("Listen on %s:%d", address, port))
-- 真正处理网络部分。socketdriver就是处理socket的服务节点 见 lua-socket.c
socket = socketdriver.listen(address, port)
socketdriver.start(socket)
---如果上层应用有open指令,那么执行之
if handler.open then
return handler.open(source, conf)
end
end
...
end
可以想见,gateserver.lua 会全权处理好对socket的控制,socket的消息,比如tcp接入、报文收取什么的都是有socket类型的消息处理环节:
gateserver.lua:
function gateserver.start(handler)
...
--这就是处理socket消息的地方
--socket消息被netpack中间件filter()处理转换为其他消息类型
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}
...
local MSG = {}
--收到数据的消息处理,注意是已经去掉长度头部组装好的数据帧
local function dispatch_msg(fd, msg, sz)
if connection[fd] then
handler.message(fd, msg, sz) --这里就是调用gate.lua中安装的handler咯
else
skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
end
end
MSG.data = dispatch_msg
...
--处理接入,accpet之后被触发
function MSG.open(fd, msg)
if client_number >= maxclient then
socketdriver.close(fd)
return
end
if nodelay then
socketdriver.nodelay(fd)
end
connection[fd] = true
client_number = client_number + 1
handler.connect(fd, msg)--这里就是调用gate.lua中安装的handler咯
end
...
--处理接入,断开之后被触发
function MSG.close(fd)
if fd ~= socket then
if handler.disconnect then
handler.disconnect(fd)--这里就是调用gate.lua中安装的handler咯
end
close_fd(fd)
else
socket = nil
end
end
...
end
上述代码有点小背景,想要搞清楚可以看看“【专题1】skynet之lua版本的gate_service分析”。
上面处理 类如data、open、close 什么的消息,其实是来自于 netpack.filter中间件对socket消息的预处理。想搞清楚就看上面文档。
由于gateserver.lua仅处理框架层面的事务,实际处理逻辑还是要看gate.lua
gate.lua:
--收到数据帧
function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent
if agent then
skynet.redirect(agent, c.client, "client", 0, msg, sz)
else
skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
end
end
-- 上面的代码是收到数据时的逻辑,在没有介绍的部位 gate 其实可以安装agent来处理数据业务,,上面redirect就是这样的路径。
否则,gate会将该消息转移给 watchdog
function handler.connect(fd, addr)
local c = {
fd = fd,
ip = addr,
}
connection[fd] = c
skynet.send(watchdog, "lua", "socket", "open", fd, addr)
end
-- 上面的代码是建立连接的逻辑,可见,gate会将该消息转移给 watchdog
function handler.disconnect(fd)
close_fd(fd)
skynet.send(watchdog, "lua", "socket", "close", fd)
end
-- 上面的代码是连接断开的逻辑,gate会将该消息转移给 watchdog
gate服务节点会密切保持和watchdog间的联系,不管接入还是断开,都会通知watchdog。我们看看这部分看门狗如何处理。
watchdog.lua:
--下述代码,在gate在收到接入事件时被传递触发
-- 看门狗建立了agent并通过start指令将所有信息转交给它
function SOCKET.open(fd, addr)
skynet.error("New client from : " .. addr)
agent[fd] = skynet.newservice("agent")
skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
end
...
--下述代码,在gate在收到断开事件时被传递触发
function SOCKET.close(fd)
print("socket close",fd)
close_agent(fd)
end
...
--下述代码,在gate在收到数据时被传递触发
function SOCKET.data(fd, msg)
end
可见,看门狗在安排好gate之后,还要负责为gate介绍agent来处理数据。gate运行起来后有个什么事情都会告诉看门狗,尤其是数据业务,后者会为其创建agent。
我们看看agent如何和gate配合。
从watchdog引导agent开始:
agent[fd] = skynet.newservice("agent")
skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
agent.lua:
...
---这里就是看门狗在创建agent之后,第一个发布
---skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
---指令的地方
function CMD.start(conf)
--取出参数啥的
local fd = conf.client
local gate = conf.gate
WATCHDOG = conf.watchdog
-- 下面的部分就有点看不明白了,应该是sproto相关的初始化
-- slot 1,2 set at main.lua
host = sprotoloader.load(1):host "package"
send_request = host:attach(sprotoloader.load(2))
skynet.fork(function()
while true do
send_package(send_request "heartbeat")
skynet.sleep(500)
end
end)
-- 注意,这里的fd就是socket服务节点连接句柄,可以直接使用socket api来访问的
client_fd = fd
--关键: agent还向gate发布了 forward指令
skynet.call(gate, "lua", "forward", fd)
end
skynet.start(function()
skynet.dispatch("lua", function(_,_, command, ...)
local f = CMD[command]
skynet.ret(skynet.pack(f(...)))
end)
end)
上面可以看到,看门狗除了引导agent之外,还执行了它的start指令,其中agent执行了 gate的forward指令。
gate.lua
function CMD.forward(source, fd, client, address)
local c = assert(connection[fd])
unforward(c)
c.client = client or 0
c.agent = address or source
forwarding[c.agent] = c
gateserver.openclient(fd)
end
上面的代码很简单,gate就是记录下connection对应的agent之类的信息。
再回到gate看看数据转移的代码,gate中很明白的指示了向agent发送数据:
gate.lua:
--收到数据帧
function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent
if agent then
-- 将数据发送给agent
skynet.redirect(agent, c.client, "client", 0, msg, sz)
else
skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
end
end
复习 redirect 用法:
skynet.redirect(address, source, typename, session, ...) 它和 skynet.send 功能类似,但更细节一些。它可以指定发送地址(把消息源伪装成另一个服务),指定发送的消息的 session 。注:address 和 source 都必须是数字地址,不可以是别名。skynet.redirect 不会调用 pack ,所以这里的 ... 必须是一个编码过的字符串,或是 userdata 加一个长度。
基本到这里就已经比较清楚了,不去分析 agent如何用sproto格式处理数据细节,调用关系如下:
main.lua 首先引导watchdog.lua,使用网络配置参数调用其 start 接口。(newservice + start 是skynet中常用的组合)
看门狗启动时引导 gate.lua,main.lua对前者start的调用中又调用了gate的open命令,在gateserver.lua模板中启动socketdriver.listen和start。至此网络接口打开等候连接进入,引导过程完成。
gate.lua是gateserver.lua的应用,后面一旦有连接进来,都会执行gate.lua设置的handler,其中接入过程,会在gate.lua中通知回watchdog,后者就新建一个agent,并start之。
agent引导起来后,start过程中会主动调用gate.lua的forward接口,为其安装agent相关信息,再之后的过程就是 gate.lua中接收到数据都会redirect到agent了。
分析完成。