[关闭]
@wsd1 2017-01-19T08:02:49.000000Z 字数 4605 阅读 6921

【专题10】搞明白skynet的skynet.register_protocol() 和 skynet.dispatch()

skynet 201611


参考文档

http://cloudwu.github.io/lua53doc/manual.html#6.4.1

https://github.com/cloudwu/skynet/wiki

背景

时隔半年又重回skynet,目的是参考gate处理udp报文的逻辑。本记录尝试分析skynet中在lua层面对底层消息的处理机制。

skynet的底层消息最重要的两个特征是地址和类型,前者不用多说,后者决定了在消息上升被处理函数接受之前,采用何种方式来反序列化(要组织成参数序列嘛)。

wiki中 LuaAPI文档复习:

skynet.dispatch(type, function(session, source, ...) ... end) 注册特定类消息的处理函数。大多数程序会注册 lua 类消息的处理函数,惯例的写法是:

  1. local CMD = {}
  2. skynet.dispatch("lua", function(session, source, cmd, ...)
  3. local f = assert(CMD[cmd])
  4. f(...)
  5. end)

虽然并不推荐,但你还可以注册新的消息类别,方法是使用 skynet.register_protocol。例如你可以注册一个以文本方式编码消息的消息类别。通常用 C 编写的服务更容易解析文本消息。skynet 已经定义了这种消息类别为 skynet.PTYPE_TEXT,但默认并没有注册到 lua 中使用。

  1. skynet.register_protocol {
  2. name = "text",
  3. id = skynet.PTYPE_TEXT,
  4. pack = function(m) return tostring(m) end,
  5. unpack = skynet.tostring,
  6. }

新的类别必须提供 pack 和 unpack 函数,用于消息的编码和解码。
pack 函数必须返回一个 string 或是一个 userdata 和 size 。在 Lua 脚本中,推荐你返回 string 类型,而用后一种形式需要对 skynet 底层有足够的了解(采用它多半是因为性能考虑,可以减少一些数据拷贝)。
unpack 函数接收一个 lightuserdata 和一个整数 。即上面提到的 message 和 size 。lua 无法直接处理 C 指针,所以必须使用额外的 C 库导入函数来解码。skynet.tostring 就是这样的一个函数,它将这个 C 指针和长度翻译成 lua 的 string 。

接下来你可以使用 skynet.dispatch 注册 text 类别的处理方法了。当然,直接在 skynet.register_protocol 时传入 dispatch 函数也可以。
dispatch 函数会在收到每条类别对应的消息时被回调。消息先经过 unpack 函数,返回值被传入 dispatch 。每条消息的处理都工作在一个独立的 coroutine 中,看起来以多线程方式工作。但记住,在同一个 lua 虚拟机(同一个 lua 服务)中,永远不可能出现多线程并发的情况。你的 lua 脚本不需要考虑线程安全的问题,但每次有阻塞 api 调用时,脚本都可能发生重入,这点务必小心。CriticalSection 模块可以帮助你减少并发带来的复杂性。

主人公是 skynet.dispatch() 和 skynet.register_protocol() ,下面分析内部机制:

skynet.dispatch()

先来skynet.dispatch()

使用实例:

  1. skynet.dispatch("lua", function(session, source, cmd, ...)
  2. local f = assert(CMD[cmd])
  3. f(...)
  4. end)

定义在
lualib/skynet.lua:

  1. function skynet.dispatch(typename, func)
  2. local p = proto[typename]
  3. if func then
  4. local ret = p.dispatch
  5. p.dispatch = func
  6. return ret
  7. else
  8. return p and p.dispatch
  9. end
  10. end

代码很简单,就是将type作为key,func作为value,存储在proto[]数组中。
proto = {} 定义在skynet.lua中
一句话记得:proto[typename].dispatch = func

到这里跟踪线索基本中断,可以想象,后续是消息自底而上一定会再遇到这个proto。

消息从底层传递到callback开始

其中 skynet.start()函数实现中有代码:

  1. function skynet.start(start_func)
  2. c.callback(skynet.dispatch_message)
  3. //第一件事就是安装callback
  4. skynet.timeout(0, function()
  5. skynet.init_service(start_func)
  6. end)
  7. /*
  8. timeout()函数设置一个0时间的定时器,能直接使用刚才我们安装的callback;
  9. 另外,更为重要的是其start_func可能会yield,所以不能直接使用pcall。
  10. */
  11. end

其中

c.callback(skynet.dispatch_message) 

c.callback() 定义在 lua-skynet.c,代码不看了,就是skynet.dispatch_message 被安装为snlua service的回调专门处理底层消息。

此部分代码细节可以参考:
【专题4】搞明白skynet的C语言到lua环境建立之三(事件发生如何引导lua的coroutine)

snlua服务的回调处理函数 skynet.dispatch_message

无论如何,c层面的消息要送到lua环境,需要经过 skynet.dispatch_message函数。其定义在 skynet.lua

  1. function skynet.dispatch_message(...)
  2. local succ, err = pcall(raw_dispatch_message,...)
  3. ...
  4. end

再开始看看 raw_dispatch_message:

  1. local function raw_dispatch_message(prototype, msg, sz, session, source)
  2. -- skynet.PTYPE_RESPONSE = 1, read skynet.h
  3. -- 调用回应
  4. if prototype == 1 then
  5. ... 这里回应处理,找等候的coroutine啥的,略过
  6. else
  7. -- 主动调用,重点分析,先找配套的proto
  8. local p = proto[prototype]
  9. if p == nil then
  10. ...找不到,错误处理
  11. return
  12. end
  13. local f = p.dispatch --这里就是dispatch安装的处理函数了
  14. if f then
  15. -- 这里好像是对调用者做ref计数
  16. local ref = watching_service[source]
  17. if ref then
  18. watching_service[source] = ref + 1
  19. else
  20. watching_service[source] = 1
  21. end
  22. -- 可见,对某服务call访问,都会在对方构造coroutine
  23. local co = co_create(f)
  24. -- 下面这些都是 索引表啥的记录
  25. session_coroutine_id[co] = session
  26. session_coroutine_address[co] = source
  27. -- 执行 coroutine
  28. suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
  29. else
  30. unknown_request(session, source, msg, sz, proto[prototype].name)
  31. end
  32. end
  33. end

上述的注释说的很清楚,对某服务call访问,都会在对方以coroutine方式来运行。其中最重要的是构建 coroutine和 coroutine_resume:

co_create()函数的精妙分析参见:【专题4】搞明白skynet的C语言到lua环境建立之二(lua被加载之后的如何展开)

coroutine_resume(co, session,source, p.unpack(msg,sz))

就会执行:

proto[prototype].dispatch(
    session,
    source, 
    proto[prototype].unpack(msg,sz))

正好对应于:

skynet.dispatch(type, function(session, source, ...) ... end)

这里,基本都打通了。

原理概括

snlua引导带有skynet.start()的代码,其中调用skynet.dispatch()安装了可被其他服务调用的prototype类型和处理函数。

proto[prototype].dispatch
proto[prototype].unpack

再snlua接收到底层消息时,会根据类型判断这个不是调用其他服务的回应,从而根据 proto注册的函数构造一个coroutine,通过resume coroutine来运行这个处理函数。

其处理函数调用时,还会使用安装的 unpack 对msg预处理,生成所需要的参数。

如果不用lua类型消息 咋办?skynet.register_protocol()分析

典型应用:

  1. skynet.register_protocol {
  2. name = "text",
  3. id = skynet.PTYPE_TEXT,
  4. pack = function(m) return tostring(m) end,
  5. unpack = skynet.tostring,
  6. }

很简单代码,就是在proto中做注册:

  1. function skynet.register_protocol(class)
  2. local name = class.name
  3. local id = class.id
  4. assert(proto[name] == nil)
  5. assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
  6. proto[name] = class --同时使用nameid来索引
  7. proto[id] = class
  8. end

可见,proto["text"] 和 proto[skynet.PTYPE_TEXT]是一样的。
到此结束。

借用skynet.PTYPE_SOCKET 类型,看看 unpack是咋使用的

参考代码: lualib/snax/gateserver.lua

  1. ...
  2. skynet.register_protocol {
  3. name = "socket",
  4. id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
  5. unpack = function ( msg, sz )
  6. return netpack.filter( queue, msg, sz)
  7. end,
  8. dispatch = function (_, _, q, type, ...)
  9. queue = q
  10. if type then
  11. MSG[type](...)
  12. end
  13. end
  14. }

更多细节参见:
【专题1】skynet之lua版本的gate_service分析

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注