[关闭]
@wsd1 2017-01-19T08:00:33.000000Z 字数 9334 阅读 3665

【专题5】skynet的luaAPI实现专项分析

skynet


自用笔记,仅供参考

参考文档

http://cloudwu.github.io/lua53doc/manual.html#6.4.1

https://github.com/cloudwu/skynet/wiki

【专题4】搞明白skynet的C语言到lua环境建立之一、二、三 篇

背景

skynet一个关键的优势是使用lua语言撰写脚本,而使用脚本语言写逻辑的一个大好处就是可以使用顺序逻辑描述业务。表面的平整之下实际是C语言对lua虚拟机的调度器在起作用。

阻塞API从lua中yield回C代码中,之后有了事件再次resume,看起来实现很简单,但是更加复杂的是错误的处理,API调用不知道会经历多少艰辛,出错、超时如何处理?这个是关键所在。

本篇是 【专题4】搞明白skynet的C语言到lua环境建立之x系列 的延续篇

API研究 sleep()

API sleep()的进入

既然已经弄明白lua运行环境和入口,所以可以直接从skynet的API开始看:

  1. function skynet.sleep(ti)
  2. local session = c.intcommand("TIMEOUT",ti)
  3. local succ, ret = coroutine_yield("SLEEP", session)
  4. -- 上半部戛然而止

yield 必会回到suspend()中:

  1. local function raw_dispatch_message(prototype, msg, sz, session, source)
  2. ...
  3. suspend(co, coroutine_resume(co, true, msg, sz))
  4. ...

进而被suspend()下述部分逻辑处理:

  1. ...
  2. elseif command == "SLEEP" then
  3. session_id_coroutine[param] = co
  4. sleep_session[co] = param
  5. ...
  6. dispatch_wakeup()
  7. dispatch_error_queue()

可见 suspend仅仅将session和co记录在案,注意这个表:sleep_session[]。

然后就如果一切干净,suspend函数便会退出,再回到 raw_dispatch_message() ,再回到 skynet.dispatch_message,然后再回到_cb(),再回归skynet的大循环中去。

API sleep()的退出

退出过程和timeout大致一样:

  1. dispatch_message()
  2. -> pcall(raw_dispatch_message,...)
  3. ->...
  4. -> suspend(co, coroutine_resume(co, true, msg, sz))
  5. --注意,其中第一个返回值是true

回到 skynet.sleep()后半段,如下:

  1. function skynet.sleep(ti)
  2. local session = c.intcommand("TIMEOUT",ti)
  3. local succ, ret = coroutine_yield("SLEEP", session)
  4. --上半部截止
  5. -- coroutine_resume(co, true, msg, sz)
  6. -- succ = true
  7. --从下半段分析
  8. sleep_session[coroutine.running()] = nil
  9. if succ then
  10. return --一个正常的sleep完成
  11. end
  12. if ret == "BREAK" then
  13. return "BREAK"
  14. else
  15. error(ret)
  16. end
  17. end

API研究 skynet.call()

API应用实例:

  1. skynet.call(gate, "lua", "kick", fd)

命令发出

  1. function skynet.call(addr, typename, ...)
  2. local p = proto[typename]
  3. local session = c.send(addr, p.id , nil , p.pack(...)) --<- sleeptimeout之间的区别
  4. if session == nil then
  5. error("call to invalid address " .. skynet.address(addr))
  6. end
  7. return p.unpack(yield_call(addr, session))
  8. end

大多数代码很简单,主要就是获取session。
关键是:

  1. yield_call(addr, session)

函数定义:

  1. local function yield_call(service, session)
  2. watching_session[session] = service
  3. local succ, msg, sz = coroutine_yield("CALL", session)
  4. watching_session[session] = nil
  5. if not succ then
  6. error "call failed"
  7. end
  8. return msg,sz
  9. end

coroutine_yield 返回 true, "CALL", session

回到suspend()中:

  1. ...
  2. suspend(co, coroutine_resume(co, true, msg, sz))
  3. ...

进入其如下逻辑路径:

  1. if command == "CALL" then
  2. session_id_coroutine[param] = co

后面如果一切干净,suspend函数便会退出,再回到 raw_dispatch_message() ,再回到 skynet.dispatch_message,然后再回到_cb(),再回归skynet的大循环中去。

命令接收

call底层也是msg的交互,在另外的coroutine会接收到msg:

skynet.dispatch_message 
-> raw_dispatch_message

进入raw_dispatch_message函数的下述逻辑路径:

  1. local p = proto[prototype]
  2. ...
  3. local f = p.dispatch --调用的func
  4. if f then
  5. local ref = watching_service[source]
  6. if ref then
  7. watching_service[source] = ref + 1
  8. else
  9. watching_service[source] = 1
  10. end
  11. --新建一个coroutine
  12. local co = co_create(f)
  13. --标注
  14. session_coroutine_id[co] = session
  15. session_coroutine_address[co] = source --这个重要,将来源地址挂在本coroutine
  16. suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
  17. ...

注意,上面resume之前,还将来源加入关注表,新建立的coroutine 标注了来源地址:

  1. watching_service[source] = ref + 1
  2. ...
  3. session_coroutine_address[co] = source

最后一个: coroutine_resume(co, session,source, p.unpack(msg,sz))

在新coroutine中调用了本服务使用skynet.dispatch()安装的回调函数。示例代码类如:

  1. skynet.dispatch("lua", function(_,_, command, ...)
  2. local f = CMD[command]
  3. skynet.ret(skynet.pack(f(...)))
  4. end)

命令返回

命令返回调用 skynet.ret 来完成。

  1. function skynet.ret(msg, sz)
  2. msg = msg or ""
  3. return coroutine_yield("RETURN", msg, sz)
  4. end

异常简单,yield "RETURN"

看 suspend() 处理后续逻辑的代码上,:

  1. elseif command == "RETURN" then
  2. local co_session = session_coroutine_id[co]
  3. --本coroutine上挂载的是来源地址,见上面,是在dispatch函数被resume之前安装的
  4. local co_address = session_coroutine_address[co]
  5. if param == nil or session_response[co] then
  6. error(debug.traceback(co))
  7. end
  8. session_response[co] = true
  9. local ret
  10. if not dead_service[co_address] then
  11. --上面的逻辑都是在整理参数,检查什么的,下面才是重点:
  12. ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
  13. if not ret then
  14. -- If the package is too large, returns nil. so we should report error back
  15. c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
  16. end
  17. elseif size ~= nil then
  18. c.trash(param, size)
  19. ret = false
  20. end
  21. return suspend(co, coroutine_resume(co, ret))

注意最后有一个resume。wiki中LuaAPI章节中 skynet.ret被声明为非阻塞,果然是如此。

也就是说,skynet.ret调用之后,还会继续运行下去。

命令返回闭包 skynet.response

代码简单:

  1. function skynet.response(pack)
  2. pack = pack or skynet.pack
  3. return coroutine_yield("RESPONSE", pack)
  4. end

suspend() 处理后续逻辑的代码上:

  1. elseif command == "RESPONSE" then
  2. --获得session和来源的addr
  3. local co_session = session_coroutine_id[co]
  4. local co_address = session_coroutine_address[co]
  5. ...
  6. --下面拿到 yieldpack函数
  7. local f = param
  8. --这里定义了一个function,太长省略
  9. local function response(ok, ...)
  10. ...
  11. end
  12. -- 下面的部分和skynet.ret基本类似了
  13. watching_service[co_address] = watching_service[co_address] + 1
  14. session_response[co] = true
  15. unresponse[response] = true
  16. --关键在resume的结果就是上面定义的函数
  17. return suspend(co, coroutine_resume(co, response))

后面,我们先复习一下wiki中对该返回函数的说明:

skynet.response 返回的闭包可用于延迟回应。调用它时,第一个参数通常是 true表示是一个正常的回应,之后的参数是需要回应的数据。如果是 false,则给请求者抛出一个异常。它的返回值表示回应的地址是否还有效。如果你仅仅想知道回应地址的有效性,那么可以在第一个参数传入 "TEST" 用于检测。

再详细看看这个response()的实现:

  1. local function response(ok, ...)
  2. --TEST 命令是用来测试目标是否还在的
  3. if ok == "TEST" then
  4. if dead_service[co_address] then
  5. release_watching(co_address)
  6. unresponse[response] = nil
  7. f = false
  8. return false
  9. else
  10. return true
  11. end
  12. end
  13. ...
  14. local ret
  15. if not dead_service[co_address] then
  16. if ok then
  17. ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, f(...)) ~= nil
  18. --记得,f()是pack函数
  19. ...
  20. else
  21. ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "") ~= nil
  22. end
  23. else
  24. ret = false
  25. end
  26. release_watching(co_address) --减少ref 0则清空
  27. unresponse[response] = nil
  28. f = nil
  29. return ret
  30. end

可见,skynet.response()的实现充分利用了lua函数闭包的特性,所有相关数据随身携带,自然随心所欲啦。

API研究 skynet.newservice()

之前分析了 snlua 加载了lua代码之后,如何构造运行环境,并且运营coroutine的过程。后面,看一下lua环境下如何引导其他的lua程序。

  1. function skynet.newservice(name, ...)
  2. return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
  3. end

可见这里依赖一个launcher服务,这个服务在哪里呢? 这肯定是一个早期加载的服务,可以看看 bootstrap.lua

  1. skynet.start(function()
  2. ...
  3. local launcher = assert(skynet.launch("snlua","launcher"))
  4. skynet.name(".launcher", launcher)
  5. ...
  6. )

skynet.launch()的实现在 lualib/skynet/manager.lua中:

  1. function skynet.launch(...)
  2. local addr = c.command("LAUNCH", table.concat({...}," "))
  3. --这里返回十六进制的handle,就是服务 地址了
  4. if addr then
  5. return tonumber("0x" .. string.sub(addr , 2))
  6. end
  7. end

可见又调用了 lua_skynet.c中的内容,关键函数如下:

  1. static const char *cmd_launch(struct skynet_context * context, const char * param) {
  2. size_t sz = strlen(param);
  3. char tmp[sz+1];
  4. strcpy(tmp,param);
  5. char * args = tmp;
  6. char * mod = strsep(&args, " \t\r\n");
  7. args = strsep(&args, "\r\n");
  8. struct skynet_context * inst = skynet_context_new(mod,args);
  9. if (inst == NULL) {
  10. return NULL;
  11. } else {
  12. id_to_hex(context->result, inst->handle);
  13. return context->result; //这个重要,返回了服务地址
  14. }
  15. }

这里调用了 skynet_context_new(mod,args) 可见这里加载了一个C模块。注意之前命令是 snlua xxxxx,所以和第一篇一样,这里也是先加载 snlua.so之后,通过loader.lua加载目标 launcher.lua文件。

在继续分析下去之前,先总结一下,skynet.launch()可以直接通过snlua加载lua文件——这是独立skynet_context的服务。

回到原先话题,launcher.lua 是在bootstrap.lua中被使用skynet.launch() 加载的。

之后便可以提供服务,skynet.newservice()函数,就通过 向其 发布“LAUNCH”命令来实施。

看看 ".launcher"服务的实现,该launcher.lua文件中,负责这个LAUNCH指令的代码:

  1. function command.LAUNCH(_, service, ...)
  2. launch_service(service, ...)
  3. return NORET
  4. end
  5. local function launch_service(service, ...)
  6. local param = table.concat({...}, " ")
  7. --可见,又一次调用了skynet.launch()方法,和bootstrap.lua中引导“.launcher”一样。
  8. local inst = skynet.launch(service, param)
  9. -- inst返回值是 服务地址数值
  10. local response = skynet.response() --记住这个是闭包
  11. if inst then
  12. services[inst] = service .. " " .. param
  13. instance[inst] = response
  14. else
  15. response(false)
  16. return
  17. end
  18. return inst
  19. end

可见,LAUNCH这个命令,并非直接回应,而是制作了闭包挂在instance表中,为啥尼?
原因是,它想等被引导的lua程序确认自己运行正常,再回复。如何实现的?看一下skynet.lua中的两个函数:

  1. function skynet.start(start_func)
  2. c.callback(skynet.dispatch_message)
  3. skynet.timeout(0, function()
  4. skynet.init_service(start_func)
  5. end)
  6. end
  7. --上面的函数不陌生吧,继续
  8. function skynet.init_service(start)
  9. local ok, err = skynet.pcall(start) --谨慎运行用户 start代码
  10. if not ok then
  11. skynet.error("init service failed: " .. tostring(err))
  12. skynet.send(".launcher","lua", "ERROR")
  13. skynet.exit()
  14. else
  15. --看这里,若是成功,则调用.launcher的“LAUNCHOK”命令
  16. skynet.send(".launcher","lua", "LAUNCHOK")
  17. end
  18. end

而在launcher.lua中

  1. function command.LAUNCHOK(address)
  2. -- init notice
  3. local response = instance[address]
  4. if response then
  5. response(true, address) --这里就回应很早以前引导我的恩人了
  6. instance[address] = nil
  7. end
  8. return NORET
  9. end

launch过程分析完毕,总结

初期条件简陋,引导lua代码(例如bootstrap.lua), 直接采用c语言,用“skynet_context_new()”函数加载snlua,再引导loader.lua,再引导bootstrap.lua。

bootstrap.lua中要去引导launcher.lua,会采用skynet.launch()函数,其调用c语言扩展,也通过“skynet_context_new()”函数实施引导。

而一旦 .launcher 服务运转起来,我们就可以使用 skynet.newservice()来引导应用了。如下:

  1. function skynet.newservice(name, ...)
  2. return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
  3. end

用 .launcher 加载代码可以判断加载运行是否顺利,引导者可以通知launcher引导目标,目标运行起来正常之后,会发消息给.launcher报个平安,.launcher收到后便回报引导者调用成功。

哦,另外,点开头的服务名称,在云风wiki上有所说明:

. 开头的名字是在同一 skynet 节点下有效的,跨节点的 skynet 服务对别的节点下的 . 开头的名字不可见。不同的 skynet 节点可以定义相同的 . 开头的名字。

API研究 skynet.error

错误处理实际是一个大话题,我们看看这个函数的作用和如何被实现。

在分析 simpleweb的时候我看到不少 skynet.error()的使用,追溯一下:

in skynet.lua

  1. local c = require "skynet.core"
  2. ...
  3. skynet.error = c.error

lua-skynet.c中定义了 _error(),

  1. static int
  2. _error(lua_State *L) {
  3. struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
  4. skynet_error(context, "%s", luaL_checkstring(L,1));
  5. return 0;
  6. }

然后在 skynet_error()中做了实现。

  1. void
  2. skynet_error(struct skynet_context * context, const char *msg, ...) {
  3. ...
  4. logger = skynet_handle_findname("logger");
  5. ...
  6. char tmp[LOG_MESSAGE_SIZE];
  7. char *data = NULL;
  8. va_list ap;
  9. va_start(ap,msg);
  10. int len = vsnprintf(tmp, LOG_MESSAGE_SIZE, msg, ap);
  11. va_end(ap);
  12. ...
  13. data = skynet_strdup(tmp);
  14. ...
  15. struct skynet_message smsg;
  16. ...
  17. smsg.source = skynet_context_handle(context);
  18. ...
  19. smsg.session = 0;
  20. smsg.data = data;
  21. smsg.sz = len | ((size_t)PTYPE_TEXT << MESSAGE_TYPE_SHIFT);
  22. skynet_context_push(logger, &smsg);
  23. }

可见,该函数从参数中找出string作为参数,输出错误信息,发送给log模块。

API研究 skynet.fork ,skynet.wait,skynet.wakeup

TBC

logger

TBC

coroutine = require "skynet.coroutine"

TBC

研究 wiki “CriticalSection” 实现

TBC

研究 wiki “DataCenter” 实现

TBC

测试并研究 wiki “DebugConsole” 实现

TBC

研究 wiki “http” 实现

TBC

研究 wiki “multicast” 实现

TBC

研究 wiki “ShareData” 实现

TBC

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注