[关闭]
@wsd1 2017-01-19T08:17:55.000000Z 字数 8558 阅读 2032

【专题4】搞明白skynet的C语言到lua环境建立之三(事件发生如何引导lua的coroutine)

skynet


参考文档

http://cloudwu.github.io/lua53doc/manual.html#6.4.1

https://github.com/cloudwu/skynet/wiki

背景

skynet一个关键的优势是使用lua语言撰写脚本,而使用脚本语言写逻辑的一个大好处就是可以使用顺序逻辑描述业务。表面的平整之下实际是C语言对lua虚拟机的调度器在起作用。

阻塞API从lua中yield回C代码中,之后有了事件再次resume,看起来实现很简单,但是更加复杂的是错误的处理,API调用不知道会经历多少艰辛,出错、超时如何处理? 这个是关键所在。

上回书说到:
第一篇 大致跟踪了snlua模块被导入的过程,作为一个c模块,其也是被编译成so文件被加载的。其init中安装callback之后,发送一个消息给自己,在消息处理函数中,去掉了自己模块的callback,并且引导lua代码。

第二篇 主要跟踪了lua的代码,分析了skynet.start()和skynet的Lua API timeout的实现,其中看到coroutine如何被建立。之后,lua运行完毕回归到service_snlua.c的_init()函数中回到skynet的大循环中,除了安排好的timer和skynet_context,似乎尘归尘土归土。

本篇将看一看,timerout之后发生的事情。在此之前先扯一下snlua服务处理消息的机制,铺垫一下之前安装消息处理回调的过程。

snlua层面消息处理回调函数的安装过程

让我们关注安装回调函数的过程,在skynet.start()中:
skynet.lua:

  1. local c = require "skynet.core"
  2. c.callback(skynet.dispatch_message)

上面是安装消息处理cb的过程,可见所有消息需要抵达lua代码都会经过该 skynet.dispatch_message。

c来自于 skynet.core。

查看 lua-skynet.c

  1. static int _callback(lua_State *L) {
  2. struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
  3. //云风说这里使用闭包方式获取注册表里的数据 效率更高
  4. int forward = lua_toboolean(L, 2);
  5. luaL_checktype(L,1,LUA_TFUNCTION);
  6. lua_settop(L,1); //设置栈顶仅仅为回调函数 skynet.dispatch_message
  7. lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
  8. /*
  9. 关键逻辑,将回调函数挂在注册表上
  10. 注册表[_cb] = skynet.dispatch_message
  11. */
  12. lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
  13. lua_State *gL = lua_tothread(L,-1);
  14. if (forward) {
  15. skynet_callback(context, gL, forward_cb);
  16. } else {
  17. skynet_callback(context, gL, _cb);
  18. }
  19. return 0;
  20. }

C代码自然没法给服务安装lua回调了,所以是安装了一个可以进入lua环境的C回调:_cb,下详。

这里最重要的部分,是上面设置“注册表[_cb]=skynet.dispatch_message”。后面回调函数内可以看到如何使用。

lua-skynet.c 中被回调的函数_cb(由它来真正调用lua的回调):

  1. static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
  2. lua_State *L = ud;
  3. int trace = 1;
  4. int r;
  5. int top = lua_gettop(L);
  6. if (top == 0) {
  7. //若栈顶为空,则分别压入 traceback 和 skynet.dispatch_message
  8. lua_pushcfunction(L, traceback);
  9. lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); //这里取出lua的回调函数
  10. } else {
  11. assert(top == 2); //为何必定是两个返回值呢? 还不知道
  12. }
  13. lua_pushvalue(L,2);
  14. //将skynet.dispatch_message 再次压栈
  15. //至此,从底向上 分别是: [1]traceback [2]skynet.dispatch_message [3]skynet.dispatch_message
  16. //prototype, msg, sz, session, source
  17. lua_pushinteger(L, type);
  18. lua_pushlightuserdata(L, (void *)msg);
  19. lua_pushinteger(L,sz);
  20. lua_pushinteger(L, session);
  21. lua_pushinteger(L, source);
  22. r = lua_pcall(L, 5, 0 , trace); //输入5个参数
  23. //执行skynet.dispatch_message(type, msg, sz, session, source)
  24. ...
  25. //斗转星移,这里dispatch返回了,后面略。
  26. }

【供参考】 skynet_server.c中如何调用 cb的代码:

  1. static void
  2. dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
  3. assert(ctx->init);
  4. CHECKCALLING_BEGIN(ctx)
  5. pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
  6. int type = msg->sz >> MESSAGE_TYPE_SHIFT;
  7. size_t sz = msg->sz & MESSAGE_TYPE_MASK;
  8. if (ctx->logfile) {
  9. skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
  10. }
  11. if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) // <-------这一句最重要
  12. {
  13. skynet_free(msg->data);
  14. }
  15. CHECKCALLING_END(ctx)
  16. }

定时器超时之后...

至此,打住,我们来看看之前timeout安装之后超时发生什么:
先复习之前定时器安装最后一个步骤:

  1. function skynet.timeout(ti, func)
  2. local session = c.intcommand("TIMEOUT",ti)
  3. ...
  4. end

skynet_server.c:

  1. static const char *
  2. cmd_timeout(struct skynet_context * context, const char * param) {
  3. char * session_ptr = NULL;
  4. int ti = strtol(param, &session_ptr, 10);
  5. int session = skynet_context_newsession(context); //创建session
  6. skynet_timeout(context->handle, ti, session); //安装timer
  7. sprintf(context->result, "%d", session);
  8. return context->result;
  9. }

安装定时器的同时构造一个session,并返回lua,session在lua层面被记录在案,等候超时消息。复习完毕,我们看如何超时:

超时后的c代码调用流程:

skynet_timer.c:

timer_execute() -> dispatch_list()

  1. static inline void
  2. dispatch_list(struct timer_node *current) {
  3. do {
  4. struct timer_event * event = (struct timer_event *)(current+1);
  5. struct skynet_message message;
  6. message.source = 0;
  7. message.session = event->session; // 看到定时器中的session被放入消息
  8. message.data = NULL;
  9. message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT; //类型是 PTYPE_RESPONSE
  10. skynet_context_push(event->handle, &message); //发出
  11. ...
  12. } while (current);
  13. }

可见,上面就是定时器超时后的最关键逻辑,发送一个类型是 PTYPE_RESPONSE 并包含之前设置 session的消息。

好了,后面就是接收消息处理了。

snlua接收处理消息

然后,去看lua代码:skynet.dispatch_message

lua版本的callback:

  1. function skynet.dispatch_message(...)
  2. local succ, err = pcall(raw_dispatch_message,...)
  3. ...
  4. end

再开始看看 raw_dispatch_message:

先回忆一下定时器安装的代码:

  1. function skynet.timeout(ti, func)
  2. ...
  3. local co = co_create(func)
  4. session_id_coroutine[session] = co
  5. end

注意 session_id_coroutine这个表。

  1. local function raw_dispatch_message(prototype, msg, sz, session, source)
  2. -- 根据type做分支
  3. -- 很清楚啦~~~
  4. -- 之前设置timeout的过程 先构造了coroutine,而且定时器超时会发出 PTYPE_RESPONSE 类型 并带有session的消息,自然就是这个分支啦~~~
  5. -- skynet.PTYPE_RESPONSE = 1, read skynet.h
  6. if prototype == 1 then
  7. local co = session_id_coroutine[session]
  8. ...
  9. else
  10. --关键处理,取到然后 resume,强烈建议回头看“第二篇”中co_create()函数的分析。
  11. session_id_coroutine[session] = nil
  12. suspend(co, coroutine_resume(co, true, msg, sz))
  13. end
  14. ...
  15. end

上述代码可以看到,一个消息若是回应之前的call,这里都是先找到相应的coroutine,然后再 resume。

当然还有另外的分支,是主动调用的消息,可以看另外一个文章:【专题10】搞明白skynet的skynet.register_protocol() 和 skynet.dispatch()

这里resume开始运行之前,复习一下timer设定function的代码:

  1. skynet.timeout(0, function()
  2. skynet.init_service(start_func)
  3. end)

其中的:

  1. function()
  2. skynet.init_service(start_func)
  3. end

而且参数是:true, msg, sz,不过这里参数没有被用上。

类似于调用了这个函数:

  1. function(true, msg, sz)
  2. skynet.init_service(start_func)
  3. end

而"skynet.init_service()"函数很简单,仅仅使用pcall包装了一下,如下:

  1. function skynet.init_service(start)
  2. local ok, err = skynet.pcall(start)
  3. if not ok then
  4. skynet.error("init service failed: " .. tostring(err))
  5. skynet.send(".launcher","lua", "ERROR")
  6. skynet.exit()
  7. else
  8. skynet.send(".launcher","lua", "LAUNCHOK") -- 回应启动器 OK
  9. end
  10. end

回到 suspend()调用:

  1. suspend(co, coroutine_resume(co, true, msg, sz))

coroutine_resume 调用返回值 和 timeout(0,func) 中的func返回值没关系,其是用户代码yield时返回的值。定义在co_create()函数中,可以翻回“第二篇”,看看其实现。

  1. ...
  2. f(...)
  3. while true do
  4. f = nil
  5. coroutine_pool[#coroutine_pool+1] = co
  6. f = coroutine_yield "EXIT" --返回值在这
  7. f(coroutine_yield())
  8. end
  9. ...

在看看新函数 suspend(),KAO,好长,咬牙继续

因为所有的suspend函数都是像这样被调用的:

  1. suspend(co, coroutine_resume(co, true, msg, sz))

所以,我们知道,coroutine_yield "EXIT" 将返回,造成如下调用类如:

  1. suspend(co, true, "EXIT")

所以,其参数result表征coroutine执行是否顺利。是否是正常yield出来的。

  1. function suspend(co, result, command, param, size)
  2. --这里就是resume出错的coroutine
  3. if not result then ...
  4. end
  5. if command == "CALL" then ...
  6. elseif command == "SLEEP" then ...
  7. elseif command == "RETURN" then ...
  8. elseif command == "RESPONSE" then ...
  9. --这里是coroutine正常退出的出口,先关注一下
  10. elseif command == "EXIT" then
  11. local address = session_coroutine_address[co]
  12. -- release_watching()函数 减少addr引用计数。简化代码: watching_service[address]
  13. release_watching(address)
  14. -- 下面也是各种清理表
  15. session_coroutine_id[co] = nil
  16. session_coroutine_address[co] = nil
  17. session_response[co] = nil
  18. elseif
  19. ...
  20. end
  21. dispatch_wakeup()
  22. dispatch_error_queue()
  23. end

dispatch_wakeup()基本上就是找待运行的coroutine来运行罢了。 暂时还没有合适的例子,等下说 skynet.sleep的实现,再来说这个函数。而 dispatch_error_queue() 还没看明白。

总之,如果一切清理干净之后,suspend函数便会退出,再回到 raw_dispatch_message() ,再回到 skynet.dispatch_message:

  1. function skynet.dispatch_message(...)
  2. local succ, err = pcall(raw_dispatch_message,...)
  3. --从这里开始
  4. while true do
  5. local key,co = next(fork_queue)
  6. if co == nil then
  7. break
  8. end
  9. fork_queue[key] = nil
  10. local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
  11. if not fork_succ then
  12. if succ then
  13. succ = false
  14. err = tostring(fork_err)
  15. else
  16. err = tostring(err) .. "\n" .. tostring(fork_err)
  17. end
  18. end
  19. end
  20. assert(succ, tostring(err))
  21. end

后半段,是在处理fork_queue这个表,现在暂时比较干净还没有内容哦。所以直接略过。

然后再回到_cb(),看下半段。

  1. static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
  2. ...
  3. r = lua_pcall(L, 5, 0 , trace); //skynet.dispatch_message()
  4. ...
  5. //从这里开始: dispatch返回了。后面也没有什么。
  6. const char * self = skynet_command(context, "REG", NULL);
  7. switch (r) {
  8. case LUA_ERRRUN:
  9. skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
  10. break;
  11. case LUA_ERRMEM:
  12. skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
  13. break;
  14. case LUA_ERRERR:
  15. skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
  16. break;
  17. case LUA_ERRGCMM:
  18. skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
  19. break;
  20. };
  21. lua_pop(L,1);
  22. return 0;
  23. }

至此,c环境下的callback执行完毕。流程再回到滚滚轮回。

总结一下,c_callback使用pcall调用了skynet.dispatch_message(),然后进入lua环境下的调度器中,其中最为重要的两个函数是 co_create() 和 suspend(),前者定义了所有coroutine的运行大框架,并且维持了一个coroutine的表。后者是dispatcher resume co的直接入口。

重要总结:

C代码进入lua边界总共两个部位:
1、第一次加载snlua模块,在

service_snlua.c: 
    callback: _launch -> _init() -> lua_pcall...

这里是接收到第一个msg,开始运行加载进入的lua代码。

lua内部:

skynet.start(start_func)
    --安装回调
    c.callback(skynet.dispatch_message)
    --埋下种子
    skynet.timeout(0, function()
        skynet.init_service(start_func)
    end)
end

2、第二次是lua环境的c扩展 skynet.core模块中

skynet.lua:
    c.callback(skynet.dispatch_message) 
luaskynet.c:
    _callback(lua_State *L) 安装 _cb() ;
    _cb() -> lua_pcall
skynet.lua:
    -> skynet.dispatch_message

两次代码均采用pcall,而coroutine全部在lua环境内部维护。

lua内部:

dispatch_message() 
-> pcall(raw_dispatch_message,...) 
->...
-> suspend(co, resume(co...))

从上面的说明,可以看出lua环境中,应用采用如下写法,是非常必要的:

skynet.start(function()
    ...
    skynet.xxx_api()
    ...
end)

其中体现的原则就是:所有skynet的API(包含yield相关调用的)都只能在start内部使用。因为,这样才能保证执行的上下文,是通过lua环境coroutine管理部件来进入的。

总结,通过timeout的分析,终于完成了对lua环境进入和yield出,以及再次resume进入的过程的分析,看清楚主要边界,目的达到,打完收工。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注