@wsd1
2017-01-19T08:03:20.000000Z
字数 3710
阅读 1633
skynet
201612
http://cloudwu.github.io/lua53doc/manual.html#6.4.1
https://github.com/cloudwu/skynet/wiki
越来越喜欢deepstream,跃跃欲试想给它写一个客户端。skynet的socket实现的有点复杂,但是从作者的调性觉得还是比较靠谱,这里好好深入这个部分。
参考代码 lualib/socket.lua
简单起见,可以回溯早期的代码:
https://github.com/cloudwu/skynet/blob/06ee4297dfda4929db04370e25bbfcb77f93c3a5/lualib/socket.lua
lua-socket.c 为lua提供了socketdriver模块,这个模块是今天分析的基础。
function socket.open(addr, port)
local id = driver.connect(addr,port)
return connect(id)
end
driver.connect 是C代码实现的,最重要的就是 实施了 skynet_socket_connect()并返回了 id。
connect(id)实现:
local function connect(id)
local s = {
id = id,
buffer = driver.buffer(), -- < C实现,返回一个userdata socket_buffer结构
connected = false,
read_require = false,
co = false,
}
socket_pool[id] = s
suspend(s) --< 这里堵塞这个coroutine
if s.connected then
return id
end
end
上面可见 connect 堵塞了协程,被唤醒的位置很简单,就是在connect成功之后。
-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
local s = socket_pool[id]
if s == nil then
return
end
-- log remote addr
s.connected = true
wakeup(s) ---< 很简单,这里找到并唤醒
end
-- SKYNET_SOCKET_TYPE_DATA = 1
socket_message[1] = function(id, size, data)
local s = socket_pool[id]
if s == nil then
print("socket: drop package from " .. id)
driver.drop(data) ---< 这个C实现 就是skynet_free
return
end
local sz = driver.push(s.buffer, buffer_pool, data, size) --< 这里是C实现
local rr = s.read_required
local rrt = type(rr)
if rrt == "number" then ---< 若是有需求长度
-- read size
if sz >= rr then
s.read_required = nil
wakeup(s)
end
elseif rrt == "string" then ---< 若是直到回车
-- read line
if driver.readline(s.buffer,nil,rr) then
s.read_required = nil
wakeup(s)
end
end
end
最复杂的就是 driver.push C实现的,主要就是申请空间容纳接受的数据,返回的是缓存的总长度。
所有的buf都是放在buffer_pool中的,
s.buffer 就是 socket_buffer结构 是个链表,指向
然后比较直觉,若满足要求,就唤醒socket的协程。
下面附带 push的C实现,有点难度。不过知道大致啥意思就行了。
struct buffer_node {
char * msg;
int sz;
struct buffer_node *next;
};
struct socket_buffer {
int size;
int offset;
struct buffer_node *head;
struct buffer_node *tail;
};
...
/*
userdata send_buffer
table pool
lightuserdata msg
int size
return size
Comment: The table pool record all the buffers chunk,
and the first index [1] is a lightuserdata : free_node. We can always use this pointer for struct buffer_node .
The following ([2] ...) userdatas in table pool is the buffer chunk (for struct buffer_node),
we never free them until the VM closed. The size of first chunk ([2]) is 8 struct buffer_node,
and the second size is 16 ... The largest size of chunk is LARGE_PAGE_NODE (4096)
lpushbbuffer will get a free struct buffer_node from table pool, and then put the msg/size in it.
lpopbuffer return the struct buffer_node back to table pool (By calling return_free_node).
*/
static int
lpushbuffer(lua_State *L) {
struct socket_buffer *sb = lua_touserdata(L,1);
if (sb == NULL) {
return luaL_error(L, "need buffer object at param 1");
}
char * msg = lua_touserdata(L,3);
if (msg == NULL) {
return luaL_error(L, "need message block at param 3");
}
int pool_index = 2;
luaL_checktype(L,pool_index,LUA_TTABLE);
int sz = luaL_checkinteger(L,4);
lua_rawgeti(L,pool_index,1);
struct buffer_node * free_node = lua_touserdata(L,-1); // sb poolt msg size free_node
lua_pop(L,1);
if (free_node == NULL) {
int tsz = lua_rawlen(L,pool_index);
if (tsz == 0)
tsz++;
int size = 8;
if (tsz <= LARGE_PAGE_NODE-3) {
size <<= tsz;
} else {
size <<= LARGE_PAGE_NODE-3;
}
lnewpool(L, size);
free_node = lua_touserdata(L,-1);
lua_rawseti(L, pool_index, tsz+1);
}
lua_pushlightuserdata(L, free_node->next);
lua_rawseti(L, pool_index, 1); // sb poolt msg size
free_node->msg = msg;
free_node->sz = sz;
free_node->next = NULL;
if (sb->head == NULL) {
assert(sb->tail == NULL);
sb->head = sb->tail = free_node;
} else {
sb->tail->next = free_node;
sb->tail = free_node;
}
sb->size += sz;
lua_pushinteger(L, sb->size);
return 1;
}
static int
lnewpool(lua_State *L, int sz) {
struct buffer_node * pool = lua_newuserdata(L, sizeof(struct buffer_node) * sz);
int i;
for (i=0;i<sz;i++) {
pool[i].msg = NULL;
pool[i].sz = 0;
pool[i].next = &pool[i+1];
}
pool[sz-1].next = NULL;
if (luaL_newmetatable(L, "buffer_pool")) {
lua_pushcfunction(L, lfreepool);
lua_setfield(L, -2, "__gc");
}
lua_setmetatable(L, -2);
return 1;
}