@wsd1
2017-01-19T07:56:24.000000Z
字数 9192
阅读 3952
skynet
http://cloudwu.github.io/lua53doc/manual.html#6.4.1
https://github.com/cloudwu/skynet/wiki
通用模板 lualib/snax/gateserver.lua 将TCP 数据流,分解成 以带长度信息的数据包的结构来做数据交换。
用法如下:
local handler = {
connect = ...,
disconnect = ...,
error = ...,
command = ...,
message = ... //数据处理
}
gateserver.start(handler)
service/gate.lua 是snax.gateserver 的使用范例。
首先,依赖:
local skynet = require "skynet"
local netpack = require "netpack"
local socketdriver = require "socketdriver"
其作为一个lua服务模板,其中仅仅定义了三个方法:
function gateserver.openclient(fd)
function gateserver.closeclient(fd)
function gateserver.start(handler)
每次收到 handler.connect 后,你都需要调用 openclient 让 fd 上的消息进入。默认状态下, fd 仅仅是连接上你的服务器,但无法发送消息给你。你可以在一切准备好以后,再放行消息。
gateserver.start比较关键,其中注册 socket类型 protocol:
local MSG = {open,close,error,warning,data} --简化形式
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}
解包方法“netpack.filter”,c语言实现,定义在 lua-netpack.c中。
其同时定义了 “dispatch”,用于处理该类型消息。代码中可以看到可以处理的类型有:open,close,error,warning,data,more。
所以,流程如下:
首先,socket节点发出PTYPE_SOCKET类型消息,交付给gateserver,被指定netpack分解为dispatch的参数输入。
这个细节过程可以参考"【专题10】搞明白skynet的skynet.register_protocol() 和 skynet.dispatch()"
鉴于该消息仅仅从c实现的代码里发出,所以我们看看消息如何发出来。
关键代码在 skynet_socket.c:
skynet_socket_poll()中调用forward_message(),发送各种类型的msg,类如:SKYNET_SOCKET_TYPE_DATA,类如:
...
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
...
forward_message()组织 “skynet_socket_message”类型对象,挂载在“skynet_message”类型的消息中发出来。
简化代码如下:
forward_message(int type, bool padding, struct socket_message * result){
...
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
sm = (struct skynet_socket_message *)skynet_malloc(sz);
sm->type = type; //申请了msg,并设置消息类型
sm->id = result->id;
sm->ud = result->ud;
...
sm->buffer = result->data;
...
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm; //挂在msg上
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
skynet_context_push(..., &message)) //走你
...
}
可见,其中消息类型放在size字段高字节上,发出register_protocol函数注册的“skynet.PTYPE_SOCKET”类型消息。
最关键记得 struct skynet_socket_message结构, type、id、ud和buffer分别被赋予了 SKYNET_SOCKET_TYPE_DATA、id、size和data。
再回到 skynet.start()
register_protocol函数中,unpack的定义如下:
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
在skynet_socket.c中发出msg,由netpack.filter() 处理之后才能得到dispatch函数—— “function (_, _, q, type, ...)”需要的后三个参数组合。
别忘了,dispatch被调用的本质是:
proto[prototype].dispatch(
session,
source,
proto[prototype].unpack(msg,sz))
netpack.filter( queue, msg, sz) 处理msg的关键代码见 lua-netpack.c
static int lfilter(lua_State *L) {
struct skynet_socket_message *message = lua_touserdata(L,2); //取得msg
...
lua_settop(L, 1); //这个挺重要,设置栈底数第一个索引为栈顶,当前栈顶就指向queue了,回忆一下 netpack.filter( queue, msg, sz)
...
switch(message->type) {
case SKYNET_SOCKET_TYPE_DATA: //<------
...
return filter_data(L, message->id, (uint8_t *)buffer, message->ud); //参数分别是 id、data和size
case SKYNET_SOCKET_TYPE_CONNECT:
...
default:
...
}
}
unpack根据msg中type做了不同处理,关注SKYNET_SOCKET_TYPE_DATA 分支。
filter_data()会同参数调用 filter_data_(),流程是:
lua: netpack.filter( queue, msg, sz) -> c: lfilter() -> filter_data() -> filter_data_()
注意这里的 queue是用来缓存未成帧数据的userdate,tcp流被格式化为 长度+数据 格式,因此在中间件需解决缓存问题。
这个函数需要完成的内容:接受一个 userdata和raw data、size。
数据长度不足,内部缓存为uncomplete;
数据刚好,则去头,返回queue、"data"、id、msg、size;
若数据超出,则内部queue,返回queue、"more"。
lua-netpack.c:
//注意这个函数被调用时,L的栈顶指向queue,这是在lfilter函数中settop设置的,因此,queue将作为第一个返回值返回,其他的压栈行为将排布queue之后的返回值。
//queue的定义如下:
struct queue {
int cap;
int head;
int tail;
struct uncomplete * hash[HASHSIZE];
struct netpack queue[QUEUESIZE];
};
//可见其承担两方面功能,一是 队列,就是netpack queue[] ,二是一个uncomplete hash表,用来通过socket id找uncomplete
static int
filter_data_(lua_State *L, int fd, uint8_t * buffer, int size) {
struct queue *q = lua_touserdata(L,1); //取得第一个参数queue
struct uncomplete * uc = find_uncomplete(q, fd); //是典型通过fd hash找到uncomplete结构体的过程。都存储在q->hash[]里面。
//若找到之前的uncomplete
if (uc) {
// fill uncomplete
if (uc->read < 0) {//只有一种情况:-1 表示只得到一个开头字节(长度高字节)
// read size
assert(uc->read == -1);
int pack_size = *buffer;
pack_size |= uc->header << 8 ;
++buffer;
--size;
uc->pack.size = pack_size;
uc->pack.buffer = skynet_malloc(pack_size);
uc->read = 0;
}
//上面补足头部2字节
int need = uc->pack.size - uc->read;
if (size < need) { //后面数据不够 就先填充好 uncomplete
memcpy(uc->pack.buffer + uc->read, buffer, size);
uc->read += size;
int h = hash_fd(fd);
uc->next = q->hash[h];
q->hash[h] = uc;
return 1; //返回1是因为已经有queue在栈顶了
}
memcpy(uc->pack.buffer + uc->read, buffer, need);
buffer += need;
size -= need;
if (size == 0) { //数据正好足够,则发布data事件
lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
lua_pushinteger(L, fd);
lua_pushlightuserdata(L, uc->pack.buffer);
lua_pushinteger(L, uc->pack.size);
skynet_free(uc);
return 5;
}
//到这里,说明数据还超出需要的部分,下面的部分将收齐的压入queue,再择情创建uncomplete。发回 more 类型消息
// more data
push_data(L, fd, uc->pack.buffer, uc->pack.size, 0);
skynet_free(uc);
push_more(L, fd, buffer, size);
lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
return 2;//返回 queue, "more"
}
//若是第一次收到报文
else {
//若长度是1,则建立uncomplete并且设置 read:-1 header:第一个字节
if (size == 1) {
struct uncomplete * uc = save_uncomplete(L, fd);
uc->read = -1; //-1 表示只得到一个开头字节(长度高字节)
uc->header = *buffer;
return 1;//返回 queue
}
//否则,收到报文头部两个字节构造长度。
int pack_size = read_size(buffer);
buffer+=2;
size-=2;
//收到的报文长度不够,建立uncomplete
if (size < pack_size) {
struct uncomplete * uc = save_uncomplete(L, fd);
uc->read = size;
uc->pack.size = pack_size;
uc->pack.buffer = skynet_malloc(pack_size);
memcpy(uc->pack.buffer, buffer, size);
return 1;//返回 queue
}
//收到报文正好满足长度,下面是典型的netpack.filter()正常返回值实例
if (size == pack_size) {
// just one package
lua_pushvalue(L, lua_upvalueindex(TYPE_DATA)); //"data" 这里是使用了C函数lfilter的闭包伪索引,细节见下面专题。
lua_pushinteger(L, fd); //id
void * result = skynet_malloc(pack_size);
memcpy(result, buffer, size);
lua_pushlightuserdata(L, result); //ptr
lua_pushinteger(L, size); //size
return 5;
//这里表示返回5个参数:queue, "data", id, ptr, size
}
// more data
push_data(L, fd, buffer, pack_size, 1);
buffer += pack_size;
size -= pack_size;
push_more(L, fd, buffer, size);
lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
return 2;//返回 queue, "more"
}
}
//输入就是fd、 buf和size,将其放到 队列里
static void
push_data(lua_State *L, int fd, void *buffer, int size, int clone) {
if (clone) { //可以选择拷贝
void * tmp = skynet_malloc(size);
memcpy(tmp, buffer, size);
buffer = tmp;
}
//这个是找到 queue,如果没有就新建
struct queue *q = get_queue(L);
//找到最后一个netpack
struct netpack *np = &q->queue[q->tail];
if (++q->tail >= q->cap)
q->tail -= q->cap; //环形指针,尾部追头部
np->id = fd;
np->buffer = buffer;
np->size = size;
if (q->head == q->tail) { //追上了头部就扩展
expand_queue(L, q);
}
}
// 这个函数就是 尝试取出queue如果null,则新建
static struct queue *
get_queue(lua_State *L) {
struct queue *q = lua_touserdata(L,1);
if (q == NULL) {
q = lua_newuserdata(L, sizeof(struct queue));
q->cap = QUEUESIZE;
q->head = 0;
q->tail = 0;
int i;
for (i=0;i<HASHSIZE;i++) {
q->hash[i] = NULL;
}
lua_replace(L, 1);
}
return q;
}
截取上函数中得到数据之后返回分支,如下:
//之前lfilter中settop将输入第一个参数queue设置为栈顶,因此第一个返回值将是 queue
//使用闭包伪索引 压入第二个返回值 "data"
lua_pushvalue(L, lua_upvalueindex(TYPE_DATA)); //"data" 这里是使用了C函数lfilter的闭包伪索引,细节见下面专题。
//压入第三个返回值
lua_pushinteger(L, fd); //id
void * result = skynet_malloc(pack_size);
memcpy(result, buffer, size);
//第四、五个返回值
lua_pushlightuserdata(L, result); //ptr
lua_pushinteger(L, size); //size
return 5;
上述代码返回值序列是 queue,"data",id,ptr,size,代入可得:
proto[prototype].dispatch(
session,
source,
queue,"data",id,ptr,size)
对应dispatch的参数结构:
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
就正好对应上了。
至此,c版本的gate实现
上面的分析基本已经弄明白 netpack.filter()如何处理socket节点发来的数据,其中如是数据稍多,超过一条完整帧,就会被采用more消息发送gateserver,下面看一下gateserver中如何处理这个more消息。
MSG.more = dispatch_queue
...
local function dispatch_queue()
local fd, msg, sz = netpack.pop(queue)
if fd then
-- may dispatch even the handler.message blocked
-- If the handler.message never block, the queue should be empty, so only fork once and then exit.
-- 这里最为重要,fork等于 timeout(func, 0)的效果,因为保不齐后面dispatch_msg会被堵住,即使堵塞也会有合适的coroutine来处理msg的。
skynet.fork(dispatch_queue)
dispatch_msg(fd, msg, sz)
for fd, msg, sz in netpack.pop, queue do
dispatch_msg(fd, msg, sz)
end
end
end
首先,netpack在构建的时候,具体见“luaopen_netpack”,就为 lfilter函数构建了闭包,引入几个字符串。如下代码:
int
luaopen_netpack(lua_State *L) {
...
// the order is same with macros : TYPE_* (defined top)
lua_pushliteral(L, "data");
lua_pushliteral(L, "more");
lua_pushliteral(L, "error");
lua_pushliteral(L, "open");
lua_pushliteral(L, "close");
lua_pushliteral(L, "warning");
lua_pushcclosure(L, lfilter, 6);
//这里表示把一个新的 C 闭包压栈,上面的几个字符串就可以使用 lua_upvalueindex(TYPE_DATA) 这样来索引
lua_setfield(L, -2, "filter");
return 1;
}
lua_upvalueindex(TYPE_DATA) 就可以引用这个字符串 "data"。
lfilter() -> filter_data() -> filter_data_() 调用链中包含代码:
...
lua_pushvalue(L, lua_upvalueindex(TYPE_DATA)); //"data"
lua_pushinteger(L, fd); //
lua_pushlightuserdata(L, uc->pack.buffer); //ptr
lua_pushinteger(L, uc->pack.size); //size
skynet_free(uc);
return 5;
...
这段第一行就是引用了闭包中字符串 "data" 这个值。
奇怪之处就是为何使用闭包变量值而非直接用字符串呢?
在 2016年2月4日星期四 UTC+8下午9:56:46,云风答道:这是过去 lua 常见的优化方法, 早一点版本 lua_type 就是这样实现的。
gateserver.lua 中 值得研究的是socket类型协议的注册,以及对netpack的使用:
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}
将接受 socket服务节点返回的 头部+数据 格式的数据消息。
dispatch()被执行之前,会调用netpack.filter( queue, msg, sz) unpack这些数据,如下:
proto[prototype].dispatch(
session,
source,
proto[prototype].unpack(msg,sz))
unpack过程如下:
netpack.filter( queue, msg, sz) 调用链:netpack.filter( queue, msg, sz) -> c: lfilter() -> filter_data() -> filter_data_()
其中用到 userdata:queue,被作为第一个参数传入filter,也作为第一个返回值被返回。
其中存放了所有没有接收完成,等待补齐的数据缓存。
所以,执行完成unpack,大致如下:
proto[prototype].dispatch(
session,
source,
queue,"data",id,ptr,size)
这样就很清楚理解dispatch的调用过程了。