[关闭]
@wsd1 2017-01-19T08:03:20.000000Z 字数 3710 阅读 1769

【专题11】搞明白skynet的socket.lua

skynet 201612


参考文档

http://cloudwu.github.io/lua53doc/manual.html#6.4.1

https://github.com/cloudwu/skynet/wiki

背景

越来越喜欢deepstream,跃跃欲试想给它写一个客户端。skynet的socket实现的有点复杂,但是从作者的调性觉得还是比较靠谱,这里好好深入这个部分。

参考代码 lualib/socket.lua

简单起见,可以回溯早期的代码:
https://github.com/cloudwu/skynet/blob/06ee4297dfda4929db04370e25bbfcb77f93c3a5/lualib/socket.lua

lua-socket.c 为lua提供了socketdriver模块,这个模块是今天分析的基础。

建立连接 socket.open(addr, port)建立一个 TCP 连接。返回一个数字 id

  1. function socket.open(addr, port)
  2. local id = driver.connect(addr,port)
  3. return connect(id)
  4. end

driver.connect 是C代码实现的,最重要的就是 实施了 skynet_socket_connect()并返回了 id。

connect(id)实现:

  1. local function connect(id)
  2. local s = {
  3. id = id,
  4. buffer = driver.buffer(), -- < C实现,返回一个userdata socket_buffer结构
  5. connected = false,
  6. read_require = false,
  7. co = false,
  8. }
  9. socket_pool[id] = s
  10. suspend(s) --< 这里堵塞这个coroutine
  11. if s.connected then
  12. return id
  13. end
  14. end

上面可见 connect 堵塞了协程,被唤醒的位置很简单,就是在connect成功之后。

  1. -- SKYNET_SOCKET_TYPE_CONNECT = 2
  2. socket_message[2] = function(id, _ , addr)
  3. local s = socket_pool[id]
  4. if s == nil then
  5. return
  6. end
  7. -- log remote addr
  8. s.connected = true
  9. wakeup(s) ---< 很简单,这里找到并唤醒
  10. end

接收数据

  1. -- SKYNET_SOCKET_TYPE_DATA = 1
  2. socket_message[1] = function(id, size, data)
  3. local s = socket_pool[id]
  4. if s == nil then
  5. print("socket: drop package from " .. id)
  6. driver.drop(data) ---< 这个C实现 就是skynet_free
  7. return
  8. end
  9. local sz = driver.push(s.buffer, buffer_pool, data, size) --< 这里是C实现
  10. local rr = s.read_required
  11. local rrt = type(rr)
  12. if rrt == "number" then ---< 若是有需求长度
  13. -- read size
  14. if sz >= rr then
  15. s.read_required = nil
  16. wakeup(s)
  17. end
  18. elseif rrt == "string" then ---< 若是直到回车
  19. -- read line
  20. if driver.readline(s.buffer,nil,rr) then
  21. s.read_required = nil
  22. wakeup(s)
  23. end
  24. end
  25. end

最复杂的就是 driver.push C实现的,主要就是申请空间容纳接受的数据,返回的是缓存的总长度。

所有的buf都是放在buffer_pool中的,
s.buffer 就是 socket_buffer结构 是个链表,指向

然后比较直觉,若满足要求,就唤醒socket的协程。

下面附带 push的C实现,有点难度。不过知道大致啥意思就行了。

  1. struct buffer_node {
  2. char * msg;
  3. int sz;
  4. struct buffer_node *next;
  5. };
  6. struct socket_buffer {
  7. int size;
  8. int offset;
  9. struct buffer_node *head;
  10. struct buffer_node *tail;
  11. };
  12. ...
  13. /*
  14. userdata send_buffer
  15. table pool
  16. lightuserdata msg
  17. int size
  18. return size
  19. Comment: The table pool record all the buffers chunk,
  20. and the first index [1] is a lightuserdata : free_node. We can always use this pointer for struct buffer_node .
  21. The following ([2] ...) userdatas in table pool is the buffer chunk (for struct buffer_node),
  22. we never free them until the VM closed. The size of first chunk ([2]) is 8 struct buffer_node,
  23. and the second size is 16 ... The largest size of chunk is LARGE_PAGE_NODE (4096)
  24. lpushbbuffer will get a free struct buffer_node from table pool, and then put the msg/size in it.
  25. lpopbuffer return the struct buffer_node back to table pool (By calling return_free_node).
  26. */
  27. static int
  28. lpushbuffer(lua_State *L) {
  29. struct socket_buffer *sb = lua_touserdata(L,1);
  30. if (sb == NULL) {
  31. return luaL_error(L, "need buffer object at param 1");
  32. }
  33. char * msg = lua_touserdata(L,3);
  34. if (msg == NULL) {
  35. return luaL_error(L, "need message block at param 3");
  36. }
  37. int pool_index = 2;
  38. luaL_checktype(L,pool_index,LUA_TTABLE);
  39. int sz = luaL_checkinteger(L,4);
  40. lua_rawgeti(L,pool_index,1);
  41. struct buffer_node * free_node = lua_touserdata(L,-1); // sb poolt msg size free_node
  42. lua_pop(L,1);
  43. if (free_node == NULL) {
  44. int tsz = lua_rawlen(L,pool_index);
  45. if (tsz == 0)
  46. tsz++;
  47. int size = 8;
  48. if (tsz <= LARGE_PAGE_NODE-3) {
  49. size <<= tsz;
  50. } else {
  51. size <<= LARGE_PAGE_NODE-3;
  52. }
  53. lnewpool(L, size);
  54. free_node = lua_touserdata(L,-1);
  55. lua_rawseti(L, pool_index, tsz+1);
  56. }
  57. lua_pushlightuserdata(L, free_node->next);
  58. lua_rawseti(L, pool_index, 1); // sb poolt msg size
  59. free_node->msg = msg;
  60. free_node->sz = sz;
  61. free_node->next = NULL;
  62. if (sb->head == NULL) {
  63. assert(sb->tail == NULL);
  64. sb->head = sb->tail = free_node;
  65. } else {
  66. sb->tail->next = free_node;
  67. sb->tail = free_node;
  68. }
  69. sb->size += sz;
  70. lua_pushinteger(L, sb->size);
  71. return 1;
  72. }
  73. static int
  74. lnewpool(lua_State *L, int sz) {
  75. struct buffer_node * pool = lua_newuserdata(L, sizeof(struct buffer_node) * sz);
  76. int i;
  77. for (i=0;i<sz;i++) {
  78. pool[i].msg = NULL;
  79. pool[i].sz = 0;
  80. pool[i].next = &pool[i+1];
  81. }
  82. pool[sz-1].next = NULL;
  83. if (luaL_newmetatable(L, "buffer_pool")) {
  84. lua_pushcfunction(L, lfreepool);
  85. lua_setfield(L, -2, "__gc");
  86. }
  87. lua_setmetatable(L, -2);
  88. return 1;
  89. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注