[关闭]
@wsd1 2017-01-19T07:56:24.000000Z 字数 9192 阅读 3403

【专题1】skynet之lua版本的gate_service分析

skynet


自用笔记,仅供参考

参考文档

http://cloudwu.github.io/lua53doc/manual.html#6.4.1

https://github.com/cloudwu/skynet/wiki

gateserver.lua 背景

通用模板 lualib/snax/gateserver.lua 将TCP 数据流,分解成 以带长度信息的数据包的结构来做数据交换。

用法如下:

local handler = {
    connect = ..., 
    disconnect = ..., 
    error = ..., 
    command = ..., 
    message = ... //数据处理
}
gateserver.start(handler)

service/gate.lua 是snax.gateserver 的使用范例。

gateserver.lua源码分析

首先,依赖:

local skynet = require "skynet"
local netpack = require "netpack"
local socketdriver = require "socketdriver"

其作为一个lua服务模板,其中仅仅定义了三个方法:

function gateserver.openclient(fd)
function gateserver.closeclient(fd)
function gateserver.start(handler)

每次收到 handler.connect 后,你都需要调用 openclient 让 fd 上的消息进入。默认状态下, fd 仅仅是连接上你的服务器,但无法发送消息给你。你可以在一切准备好以后,再放行消息。

gateserver.start比较关键,其中注册 socket类型 protocol:

local MSG = {open,close,error,warning,data} --简化形式

skynet.register_protocol {
    name = "socket",
    id = skynet.PTYPE_SOCKET,   -- PTYPE_SOCKET = 6
    unpack = function ( msg, sz )
        return netpack.filter( queue, msg, sz)
    end,
    dispatch = function (_, _, q, type, ...)
        queue = q
        if type then
            MSG[type](...)
        end
    end
}

解包方法“netpack.filter”,c语言实现,定义在 lua-netpack.c中。
其同时定义了 “dispatch”,用于处理该类型消息。代码中可以看到可以处理的类型有:open,close,error,warning,data,more。

所以,流程如下:
首先,socket节点发出PTYPE_SOCKET类型消息,交付给gateserver,被指定netpack分解为dispatch的参数输入。
这个细节过程可以参考"【专题10】搞明白skynet的skynet.register_protocol() 和 skynet.dispatch()"

从skynet_socket.c代码中发出msg

鉴于该消息仅仅从c实现的代码里发出,所以我们看看消息如何发出来。

关键代码在 skynet_socket.c:

skynet_socket_poll()中调用forward_message(),发送各种类型的msg,类如:SKYNET_SOCKET_TYPE_DATA,类如:

  1. ...
  2. case SOCKET_DATA:
  3. forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
  4. ...

forward_message()组织 “skynet_socket_message”类型对象,挂载在“skynet_message”类型的消息中发出来。

简化代码如下:

  1. forward_message(int type, bool padding, struct socket_message * result){
  2. ...
  3. struct skynet_socket_message *sm;
  4. size_t sz = sizeof(*sm);
  5. sm = (struct skynet_socket_message *)skynet_malloc(sz);
  6. sm->type = type; //申请了msg,并设置消息类型
  7. sm->id = result->id;
  8. sm->ud = result->ud;
  9. ...
  10. sm->buffer = result->data;
  11. ...
  12. struct skynet_message message;
  13. message.source = 0;
  14. message.session = 0;
  15. message.data = sm; //挂在msg上
  16. message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
  17. skynet_context_push(..., &message)) //走你
  18. ...
  19. }

可见,其中消息类型放在size字段高字节上,发出register_protocol函数注册的“skynet.PTYPE_SOCKET”类型消息。

最关键记得 struct skynet_socket_message结构, type、id、ud和buffer分别被赋予了 SKYNET_SOCKET_TYPE_DATA、id、size和data。

msg的unpack详解

再回到 skynet.start()

register_protocol函数中,unpack的定义如下:

  1. unpack = function ( msg, sz )
  2. return netpack.filter( queue, msg, sz)
  3. end,

在skynet_socket.c中发出msg,由netpack.filter() 处理之后才能得到dispatch函数—— “function (_, _, q, type, ...)”需要的后三个参数组合。
别忘了,dispatch被调用的本质是:

proto[prototype].dispatch(
    session,
    source, 
    proto[prototype].unpack(msg,sz))

netpack.filter( queue, msg, sz) 处理msg的关键代码见 lua-netpack.c

  1. static int lfilter(lua_State *L) {
  2. struct skynet_socket_message *message = lua_touserdata(L,2); //取得msg
  3. ...
  4. lua_settop(L, 1); //这个挺重要,设置栈底数第一个索引为栈顶,当前栈顶就指向queue了,回忆一下 netpack.filter( queue, msg, sz)
  5. ...
  6. switch(message->type) {
  7. case SKYNET_SOCKET_TYPE_DATA: //<------
  8. ...
  9. return filter_data(L, message->id, (uint8_t *)buffer, message->ud); //参数分别是 id、data和size
  10. case SKYNET_SOCKET_TYPE_CONNECT:
  11. ...
  12. default:
  13. ...
  14. }
  15. }

unpack根据msg中type做了不同处理,关注SKYNET_SOCKET_TYPE_DATA 分支。

filter_data()会同参数调用 filter_data_(),流程是:

lua: netpack.filter( queue, msg, sz) -> c: lfilter() -> filter_data() -> filter_data_()

注意这里的 queue是用来缓存未成帧数据的userdate,tcp流被格式化为 长度+数据 格式,因此在中间件需解决缓存问题。

这个函数需要完成的内容:接受一个 userdata和raw data、size。
数据长度不足,内部缓存为uncomplete;
数据刚好,则去头,返回queue、"data"、id、msg、size;
若数据超出,则内部queue,返回queue、"more"。

lua-netpack.c:

  1. //注意这个函数被调用时,L的栈顶指向queue,这是在lfilter函数中settop设置的,因此,queue将作为第一个返回值返回,其他的压栈行为将排布queue之后的返回值。
  2. //queue的定义如下:
  3. struct queue {
  4. int cap;
  5. int head;
  6. int tail;
  7. struct uncomplete * hash[HASHSIZE];
  8. struct netpack queue[QUEUESIZE];
  9. };
  10. //可见其承担两方面功能,一是 队列,就是netpack queue[] ,二是一个uncomplete hash表,用来通过socket id找uncomplete
  11. static int
  12. filter_data_(lua_State *L, int fd, uint8_t * buffer, int size) {
  13. struct queue *q = lua_touserdata(L,1); //取得第一个参数queue
  14. struct uncomplete * uc = find_uncomplete(q, fd); //是典型通过fd hash找到uncomplete结构体的过程。都存储在q->hash[]里面。
  15. //若找到之前的uncomplete
  16. if (uc) {
  17. // fill uncomplete
  18. if (uc->read < 0) {//只有一种情况:-1 表示只得到一个开头字节(长度高字节)
  19. // read size
  20. assert(uc->read == -1);
  21. int pack_size = *buffer;
  22. pack_size |= uc->header << 8 ;
  23. ++buffer;
  24. --size;
  25. uc->pack.size = pack_size;
  26. uc->pack.buffer = skynet_malloc(pack_size);
  27. uc->read = 0;
  28. }
  29. //上面补足头部2字节
  30. int need = uc->pack.size - uc->read;
  31. if (size < need) { //后面数据不够 就先填充好 uncomplete
  32. memcpy(uc->pack.buffer + uc->read, buffer, size);
  33. uc->read += size;
  34. int h = hash_fd(fd);
  35. uc->next = q->hash[h];
  36. q->hash[h] = uc;
  37. return 1; //返回1是因为已经有queue在栈顶了
  38. }
  39. memcpy(uc->pack.buffer + uc->read, buffer, need);
  40. buffer += need;
  41. size -= need;
  42. if (size == 0) { //数据正好足够,则发布data事件
  43. lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
  44. lua_pushinteger(L, fd);
  45. lua_pushlightuserdata(L, uc->pack.buffer);
  46. lua_pushinteger(L, uc->pack.size);
  47. skynet_free(uc);
  48. return 5;
  49. }
  50. //到这里,说明数据还超出需要的部分,下面的部分将收齐的压入queue,再择情创建uncomplete。发回 more 类型消息
  51. // more data
  52. push_data(L, fd, uc->pack.buffer, uc->pack.size, 0);
  53. skynet_free(uc);
  54. push_more(L, fd, buffer, size);
  55. lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
  56. return 2;//返回 queue, "more"
  57. }
  58. //若是第一次收到报文
  59. else {
  60. //若长度是1,则建立uncomplete并且设置 read:-1 header:第一个字节
  61. if (size == 1) {
  62. struct uncomplete * uc = save_uncomplete(L, fd);
  63. uc->read = -1; //-1 表示只得到一个开头字节(长度高字节)
  64. uc->header = *buffer;
  65. return 1;//返回 queue
  66. }
  67. //否则,收到报文头部两个字节构造长度。
  68. int pack_size = read_size(buffer);
  69. buffer+=2;
  70. size-=2;
  71. //收到的报文长度不够,建立uncomplete
  72. if (size < pack_size) {
  73. struct uncomplete * uc = save_uncomplete(L, fd);
  74. uc->read = size;
  75. uc->pack.size = pack_size;
  76. uc->pack.buffer = skynet_malloc(pack_size);
  77. memcpy(uc->pack.buffer, buffer, size);
  78. return 1;//返回 queue
  79. }
  80. //收到报文正好满足长度,下面是典型的netpack.filter()正常返回值实例
  81. if (size == pack_size) {
  82. // just one package
  83. lua_pushvalue(L, lua_upvalueindex(TYPE_DATA)); //"data" 这里是使用了C函数lfilter的闭包伪索引,细节见下面专题。
  84. lua_pushinteger(L, fd); //id
  85. void * result = skynet_malloc(pack_size);
  86. memcpy(result, buffer, size);
  87. lua_pushlightuserdata(L, result); //ptr
  88. lua_pushinteger(L, size); //size
  89. return 5;
  90. //这里表示返回5个参数:queue, "data", id, ptr, size
  91. }
  92. // more data
  93. push_data(L, fd, buffer, pack_size, 1);
  94. buffer += pack_size;
  95. size -= pack_size;
  96. push_more(L, fd, buffer, size);
  97. lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
  98. return 2;//返回 queue, "more"
  99. }
  100. }
  101. //输入就是fd、 buf和size,将其放到 队列里
  102. static void
  103. push_data(lua_State *L, int fd, void *buffer, int size, int clone) {
  104. if (clone) { //可以选择拷贝
  105. void * tmp = skynet_malloc(size);
  106. memcpy(tmp, buffer, size);
  107. buffer = tmp;
  108. }
  109. //这个是找到 queue,如果没有就新建
  110. struct queue *q = get_queue(L);
  111. //找到最后一个netpack
  112. struct netpack *np = &q->queue[q->tail];
  113. if (++q->tail >= q->cap)
  114. q->tail -= q->cap; //环形指针,尾部追头部
  115. np->id = fd;
  116. np->buffer = buffer;
  117. np->size = size;
  118. if (q->head == q->tail) { //追上了头部就扩展
  119. expand_queue(L, q);
  120. }
  121. }
  122. // 这个函数就是 尝试取出queue如果null,则新建
  123. static struct queue *
  124. get_queue(lua_State *L) {
  125. struct queue *q = lua_touserdata(L,1);
  126. if (q == NULL) {
  127. q = lua_newuserdata(L, sizeof(struct queue));
  128. q->cap = QUEUESIZE;
  129. q->head = 0;
  130. q->tail = 0;
  131. int i;
  132. for (i=0;i<HASHSIZE;i++) {
  133. q->hash[i] = NULL;
  134. }
  135. lua_replace(L, 1);
  136. }
  137. return q;
  138. }

截取上函数中得到数据之后返回分支,如下:

  1. //之前lfilter中settop将输入第一个参数queue设置为栈顶,因此第一个返回值将是 queue
  2. //使用闭包伪索引 压入第二个返回值 "data"
  3. lua_pushvalue(L, lua_upvalueindex(TYPE_DATA)); //"data" 这里是使用了C函数lfilter的闭包伪索引,细节见下面专题。
  4. //压入第三个返回值
  5. lua_pushinteger(L, fd); //id
  6. void * result = skynet_malloc(pack_size);
  7. memcpy(result, buffer, size);
  8. //第四、五个返回值
  9. lua_pushlightuserdata(L, result); //ptr
  10. lua_pushinteger(L, size); //size
  11. return 5;

上述代码返回值序列是 queue,"data",id,ptr,size,代入可得:

proto[prototype].dispatch(
    session,
    source, 
    queue,"data",id,ptr,size)

对应dispatch的参数结构:

    dispatch = function (_, _, q, type, ...)
        queue = q
        if type then
            MSG[type](...)
        end
    end

就正好对应上了。

至此,c版本的gate实现

gateserver.lua源码分析 对more消息处理

上面的分析基本已经弄明白 netpack.filter()如何处理socket节点发来的数据,其中如是数据稍多,超过一条完整帧,就会被采用more消息发送gateserver,下面看一下gateserver中如何处理这个more消息。

  1. MSG.more = dispatch_queue
  2. ...
  3. local function dispatch_queue()
  4. local fd, msg, sz = netpack.pop(queue)
  5. if fd then
  6. -- may dispatch even the handler.message blocked
  7. -- If the handler.message never block, the queue should be empty, so only fork once and then exit.
  8. -- 这里最为重要,fork等于 timeout(func, 0)的效果,因为保不齐后面dispatch_msg会被堵住,即使堵塞也会有合适的coroutine来处理msg的。
  9. skynet.fork(dispatch_queue)
  10. dispatch_msg(fd, msg, sz)
  11. for fd, msg, sz in netpack.pop, queue do
  12. dispatch_msg(fd, msg, sz)
  13. end
  14. end
  15. end

C函数lfilter的闭包伪索引 专题

首先,netpack在构建的时候,具体见“luaopen_netpack”,就为 lfilter函数构建了闭包,引入几个字符串。如下代码:

  1. int
  2. luaopen_netpack(lua_State *L) {
  3. ...
  4. // the order is same with macros : TYPE_* (defined top)
  5. lua_pushliteral(L, "data");
  6. lua_pushliteral(L, "more");
  7. lua_pushliteral(L, "error");
  8. lua_pushliteral(L, "open");
  9. lua_pushliteral(L, "close");
  10. lua_pushliteral(L, "warning");
  11. lua_pushcclosure(L, lfilter, 6);
  12. //这里表示把一个新的 C 闭包压栈,上面的几个字符串就可以使用 lua_upvalueindex(TYPE_DATA) 这样来索引
  13. lua_setfield(L, -2, "filter");
  14. return 1;
  15. }

lua_upvalueindex(TYPE_DATA) 就可以引用这个字符串 "data"。

lfilter() -> filter_data() -> filter_data_() 调用链中包含代码:

  1. ...
  2. lua_pushvalue(L, lua_upvalueindex(TYPE_DATA)); //"data"
  3. lua_pushinteger(L, fd); //
  4. lua_pushlightuserdata(L, uc->pack.buffer); //ptr
  5. lua_pushinteger(L, uc->pack.size); //size
  6. skynet_free(uc);
  7. return 5;
  8. ...

这段第一行就是引用了闭包中字符串 "data" 这个值。

奇怪之处就是为何使用闭包变量值而非直接用字符串呢?

在 2016年2月4日星期四 UTC+8下午9:56:46,云风答道:这是过去 lua 常见的优化方法, 早一点版本 lua_type 就是这样实现的。

总结

gateserver.lua 中 值得研究的是socket类型协议的注册,以及对netpack的使用:

skynet.register_protocol {
    name = "socket",
    id = skynet.PTYPE_SOCKET,   -- PTYPE_SOCKET = 6
    unpack = function ( msg, sz )
        return netpack.filter( queue, msg, sz)
    end,
    dispatch = function (_, _, q, type, ...)
        queue = q
        if type then
            MSG[type](...)
        end
    end
}

将接受 socket服务节点返回的 头部+数据 格式的数据消息。

dispatch()被执行之前,会调用netpack.filter( queue, msg, sz) unpack这些数据,如下:

proto[prototype].dispatch(
    session,
    source, 
    proto[prototype].unpack(msg,sz))

unpack过程如下:
netpack.filter( queue, msg, sz) 调用链:netpack.filter( queue, msg, sz) -> c: lfilter() -> filter_data() -> filter_data_()

其中用到 userdata:queue,被作为第一个参数传入filter,也作为第一个返回值被返回。
其中存放了所有没有接收完成,等待补齐的数据缓存。

所以,执行完成unpack,大致如下:

proto[prototype].dispatch(
    session,
    source, 
    queue,"data",id,ptr,size)

这样就很清楚理解dispatch的调用过程了。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注