[关闭]
@Otokaze 2018-09-30T16:49:47.000000Z 字数 35056 阅读 1023

libevent 笔记

C语言

libevent 简介

Libevent 是一个用 C 语言编写的、轻量级的开源高性能事件通知库,主要有以下几个亮点:事件驱动(event-driven),高性能;轻量级,专注于网络,不如 ACE 那么臃肿庞大;源代码相当精炼、易读;跨平台,支持 Windows、 Linux、 *BSD 和 Mac Os;支持多种 I/O 多路复用技术, epoll、 poll、 dev/poll、 select 和 kqueue 等;支持 I/O,定时器和信号等事件;注册事件优先级;内置 OpenSSL 支持,编写 TLS/SSL 应用更加容易;内置 HTTP、DNS 异步实现,可以说很强大了。

libevent 安装

  1. pacman -S libevent

libevent 用法

event.h

为什么使用 libevent,可能大家都比较清楚,因为 Linux 的 epoll 不是很好用,很多时候我们只是想简单的实现异步 IO(socket 编程),所以 libevent 出现了。

在 epoll 中,要使用 epoll IO 多路复用,必须首先创建一个 epoll fd,这个 fd 可以看作是一个 epoll 对象,我们所有的添加、删除、修改 event 操作都是与这个 fd 相关联的。

而在 libevent 中,同样存在这么一个东西,它的名字叫做 event_base,event_base 是一个结构体,与 libevent 相关的事件都是在它上面进行操作的,作用与 epoll fd 相同。

注意,event_base 通常只能在一个线程中使用,如果需要 多线程 + libevent,那么你最好为每个线程分配一个 event_base 结构体,防止多线程访问同一个 event base 导致数据错误问题。

创建 event_base

  1. #include <event2/event.h>
  2. /* 自动选择最快的 IO 多路复用方法 */
  3. struct event_base * event_base_new();
  4. /* 释放 event base,释放占用的资源 */
  5. void event_base_free(struct event_base *base);
  6. /* 在 fork() 之后需要重新 init base */
  7. int event_reinit(struct event_base *base);
  8. /* 获取 libevent 支持的底层方法 */
  9. const char ** event_get_supported_methods();
  10. /* 获取 libevent base 当前使用的方法 */
  11. const char * event_base_get_method(const struct event_base *base);

运行 event loop

  1. #include <event2/event.h>
  2. /* 开始事件循环,event loop,当前线程会被阻塞,遇到以下情况时,event loop 结束 */
  3. /* 没有更多事件,或者调用 event_base_loopbreak()、event_base_loopexit() 方法 */
  4. int event_base_dispatch(struct event_base *base);
  5. /*
  6. 正常退出,其中 tv 为延迟时间,NULL 表示没有延迟,如果当前还有事件,会运行完再退出
  7. */
  8. int event_base_loopexit(struct event_base *base, const struct timeval *tv);
  9. /*
  10. 立即退出,看官方文档好像是强制结束,但好像说的又不是很清除,总之不建议使用这个方法
  11. */
  12. int event_base_loopbreak(struct event_base *base);
  13. /* 判断是 exit 退出的还是 break 退出的 */
  14. int event_base_got_exit(struct event_base *base);
  15. int event_base_got_break(struct event_base *base);

但是,上面只讲了如何 event base 的创建、运行、停止、删除,并未说明如何创建 event,毕竟 event 是 event loop 的基本运作单位,这里开始介绍 event 的创建、添加、删除等 API。

libevent 支持管理的事件有:准备读取/写入的文件描述符、超时到期、收到信号、用户触发的事件。

event 有相似的生命周期。使用 new 方法创建时,event 处于 initialized 状态。当你往 event base 里面添加 event 时,event 处于 pending 状态。当 event 被指定事件触发时,event 处于 active 状态,同时自动运行用户提供的回调函数(callback function)。如果将 event 配置为 persistent,那么这个 event 不会在触发(active)一次之后被自动移除,而是继续保持 pending 状态,等待下次 active(这通常是我们想要的,否则你需要重新 add event)。

创建 event

  1. #include <event2/event.h>
  2. #define EV_TIMEOUT 0x01 // 超时事件
  3. #define EV_READ 0x02 // 文件描述符以准备好读取
  4. #define EV_WRITE 0x04 // 文件描述符已准备好写入
  5. #define EV_SIGNAL 0x08 // 接收到信号
  6. #define EV_PERSIST 0x10 // 表示事件是持久的
  7. #define EV_ET 0x20 // 启用 ET 边缘触发
  8. /*
  9. @param evutil_socket_t fd socket_fd,unix 上为 int
  10. @param short what 则是发生的具体 events(事件)
  11. @param void *arg 传递给 callback function 的参数指针
  12. */
  13. typedef void (*event_callback_fn)(evutil_socket_t, short, void *);
  14. // new,如果不需要 fd(比如普通定时任务),可以将 fd 设为 -1
  15. struct event * event_new(struct event_base *base, evutil_socket_t fd,
  16. short what, event_callback_fn cb, void *arg);
  17. // free
  18. void event_free(struct event *event);

在 event_new() 方法中,我们通常希望传递当前 event 结构体进去(传递给我们设置的回调函数),但是此时 event 结构体还未创建,互相矛盾,为了解决这个问题,libevent 提供 void * event_self_cbarg() 函数,它的返回值就是当前的 event 指针,即 Java 中的 this 指针。如果你还不明白,那么请看下面的例子:

  1. #include <event2/event.h>
  2. static int n_calls = 0;
  3. void cb_func(evutil_socket_t fd, short what, void *arg)
  4. {
  5. struct event *me = arg;
  6. printf("cb_func called %d times so far.\n", ++n_calls);
  7. if (n_calls > 100)
  8. event_del(me);
  9. }
  10. void run(struct event_base *base)
  11. {
  12. struct timeval one_sec = { 1, 0 };
  13. struct event *ev;
  14. /* We're going to set up a repeating timer to get called called 100
  15. times. */
  16. ev = event_new(base, -1, EV_PERSIST, cb_func, event_self_cbarg());
  17. event_add(ev, &one_sec);
  18. event_base_dispatch(base);
  19. }

例子会每隔一秒执行一次 cb_func 回调函数,当调用 100 次后,cb_func 内会移除这个 event。

有必要强调一点,libevent 中,new/free 表示创建删除,特指内存/资源的创建删除,而 add/del 这些表示添加/移除事件。不要搞混了。现在介绍 add/del event 的 API:

  1. #include <event2/event.h>
  2. // 注册事件,如果不是定时任务,tv 可为 NULL
  3. int event_add(struct event *ev, const struct timeval *tv);
  4. // 取消注册事件
  5. int event_del(struct event *ev);
  6. // 单独移除 tv 超时设置
  7. int event_remove_timer(struct event *ev);

检查 event 的状态,获取其中的某些信息的 API:

  1. int event_pending(const struct event *ev, short what, struct timeval *tv_out);
  2. #define event_get_signal(ev) /* ... */
  3. evutil_socket_t event_get_fd(const struct event *ev);
  4. int event_get_priority(const struct event *ev);
  5. short event_get_events(const struct event *ev);
  6. struct event_base *event_get_base(const struct event *ev);
  7. event_callback_fn event_get_callback(const struct event *ev);
  8. void *event_get_callback_arg(const struct event *ev);

util.h

socket api 跨平台相关

  1. #ifdef WIN32
  2. #define evutil_socket_t intptr_t
  3. #else
  4. #define evutil_socket_t int
  5. #endif
  6. int evutil_closesocket(evutil_socket_t s);
  7. #define EVUTIL_CLOSESOCKET(s) evutil_closesocket(s)
  8. #define EVUTIL_SOCKET_ERROR()
  9. #define EVUTIL_SET_SOCKET_ERROR(errcode)
  10. #define evutil_socket_geterror(sock)
  11. #define evutil_socket_error_to_string(errcode)
  12. int evutil_make_socket_nonblocking(evutil_socket_t sock);
  13. int evutil_make_listen_socket_reuseable(evutil_socket_t sock);
  14. int evutil_make_socket_closeonexec(evutil_socket_t sock);

bufferevent.h

大多数情况下,除了响应事件外,人们通常还要操作缓冲区中的数据(不能说通常吧,是一定要操作,不然 socket 编程就没有意义了),当然 libevent 也很清楚,所以它提供了更好用的 recv/send 包装 API,libevent 真香。

例如,当我们想要写数据时,通常的模式运行如下:

这种 buffer 的 IO 模式很常见,libevent 为它提供了一种通用的机制:"bufferevent",由底层传输(如套接字),读缓冲区和写缓冲区组成。当底层传输准备好被读取或写入时,bufferevent 会调用其用户提供的回调,而不是在读取或写入足够数据时调用其回调的常规事件。

Bufferevents 目前仅适用于 TCP 等面向流的协议(当然包括 Unix Domain Socket)。将来可能会支持 UDP 等面向数据报的协议。

bufferevents 和 evbuffers
每个 bufferevent 都有一个输入缓冲区和一个输出缓冲区。这些是 struct evbuffer 类型。当您要在 bufferevent 上写入数据时,将其添加到输出缓冲区; 当 bufferevent 有数据供你读取时,你将它从输入缓冲区中取出。

其实简单的说,evbuffer 是缓存区的包装,bufferevent 是一个 event,针对 buffer。

Buffer Event 事件

  1. BEV_EVENT_READING
  2. /* An event occured during a read operation on the bufferevent. See the other flags for which event it was. */
  3. BEV_EVENT_WRITING
  4. /* An event occured during a write operation on the bufferevent. See the other flags for which event it was. */
  5. BEV_EVENT_ERROR
  6. /* An error occurred during a bufferevent operation. For more information on what the error was, call EVUTIL_SOCKET_ERROR(). */
  7. BEV_EVENT_TIMEOUT
  8. /* A timeout expired on the bufferevent. */
  9. BEV_EVENT_EOF
  10. /* We got an end-of-file indication on the bufferevent. */
  11. BEV_EVENT_CONNECTED
  12. /* We finished a requested connection on the bufferevent. */

Buffer Event 选项

  1. BEV_OPT_CLOSE_ON_FREE
  2. /* When the bufferevent is freed, close the underlying transport. This will close an underlying socket, free an underlying bufferevent, etc. */
  3. BEV_OPT_THREADSAFE
  4. /* Automatically allocate locks for the bufferevent, so that it’s safe to use from multiple threads. */
  5. BEV_OPT_DEFER_CALLBACKS
  6. /* When this flag is set, the bufferevent defers all of its callbacks, as described above. */
  7. BEV_OPT_UNLOCK_CALLBACKS
  8. /* By default, when the bufferevent is set up to be threadsafe, the bufferevent’s locks are held whenever the any user-provided callback is invoked. Setting this option makes Libevent release the bufferevent’s lock when it’s invoking your callbacks. */

创建 bufferevent

  1. // fd 需要设置为非阻塞模式
  2. struct bufferevent * bufferevent_socket_new(
  3. struct event_base *base,
  4. evutil_socket_t fd,
  5. enum bufferevent_options options
  6. )

连接 bufferevent

  1. int bufferevent_socket_connect(
  2. struct bufferevent *bev,
  3. struct sockaddr *address, int addrlen
  4. )
  5. int bufferevent_socket_connect_hostname(
  6. struct bufferevent *bev,
  7. struct evdns_base *dns_base,
  8. int family,
  9. const char *hostname,
  10. int port
  11. )
  12. int bufferevent_socket_get_dns_error(struct bufferevent *bev);

释放 bufferevent

  1. void bufferevent_free(struct bufferevent *bev);

设置回调函数(读/写/发送错误)

  1. typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
  2. typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);
  3. void bufferevent_setcb(
  4. struct bufferevent *bufev,
  5. bufferevent_data_cb readcb,
  6. bufferevent_data_cb writecb,
  7. bufferevent_event_cb eventcb,
  8. void *cbarg
  9. );
  10. void bufferevent_getcb(
  11. struct bufferevent *bufev,
  12. bufferevent_data_cb *readcb_ptr,
  13. bufferevent_data_cb *writecb_ptr,
  14. bufferevent_event_cb *eventcb_ptr,
  15. void **cbarg_ptr
  16. );

启用 bufferevent 事件(读、写、读写)

  1. /*
  2. 启用或禁用事件 EV_READ、EV_WRITE、EV_READ|EV_WRITE。
  3. 如果未启用相关事件,bufferevent 将不会读取或写入数据。
  4. */
  5. void bufferevent_enable(struct bufferevent *bufev, short events);
  6. void bufferevent_disable(struct bufferevent *bufev, short events);
  7. short bufferevent_get_enabled(struct bufferevent *bufev); // get events

获取 bufferevent 上的 input/output buffer(读缓冲区、写缓冲区)

  1. struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
  2. struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);

这两个函数是缓冲区操作的基础:它们分别返回输入和输出缓冲区。操作 evbuffer 类型的缓冲区。

读取、写入 eevbuffer 中的数据

  1. // 从 bufev 读取数据(剪切)到 data 中,返回值实际读取的字节数
  2. int bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
  3. // 从 bufev 读取数据(剪切)到 buf 中,成功返回 0,失败返回 -1
  4. int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
  5. // 将 data 中的数据写入到 bufev 中,成功返回 0,失败返回 -1
  6. int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
  7. // 将 buf 中的数据写入到 bufev 中,并删除 buf 中的数据,成功返回 0,失败返回 -1
  8. int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);

buffer.h

创建、释放 evbuffer

  1. struct evbuffer *evbuffer_new(void);
  2. void evbuffer_free(struct evbuffer *buf);

检查 evbuffer 长度

  1. size_t evbuffer_get_length(const struct evbuffer *buf); // 常用
  2. size_t evbuffer_get_contiguous_space(const struct evbuffer *buf);

添加数据到 evbuffer

  1. int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);
  2. int evbuffer_add_printf(struct evbuffer *buf, const char *fmt, ...)
  3. int evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap);
  4. int evbuffer_expandstruct evbuffer * bufsize_t datlen); // 扩充至 datlen 长度

移动一个 buf 中的数据到另一个 buf

  1. /* 将 src 的数据移到 dst buf 中 */
  2. int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src);
  3. int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, size_t datlen);

从 evbuffer 中删除数据

  1. int evbuffer_drain(struct evbuffer *buf, size_t len);
  2. int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);

从 evbuffer 中窥探数据

  1. ev_ssize_t evbuffer_copyout(
  2. struct evbuffer *buf,
  3. void *data,
  4. size_t datlen
  5. );
  6. ev_ssize_t evbuffer_copyout_from(
  7. struct evbuffer *buf,
  8. const struct evbuffer_ptr *pos,
  9. void *data_out, size_t datlen
  10. );

基于行的数据读取,比如 HTTP 协议

  1. enum evbuffer_eol_style {
  2. EVBUFFER_EOL_ANY,
  3. EVBUFFER_EOL_CRLF,
  4. EVBUFFER_EOL_CRLF_STRICT,
  5. EVBUFFER_EOL_LF,
  6. EVBUFFER_EOL_NUL
  7. };
  8. char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
  9. enum evbuffer_eol_style eol_style);

在 evbuffer 中搜索字符串

  1. struct evbuffer_ptr {
  2. ev_ssize_t pos;
  3. struct {
  4. /* internal fields */
  5. } _internal;
  6. };
  7. struct evbuffer_ptr evbuffer_search(struct evbuffer *buffer,
  8. const char *what, size_t len, const struct evbuffer_ptr *start);
  9. struct evbuffer_ptr evbuffer_search_range(struct evbuffer *buffer,
  10. const char *what, size_t len, const struct evbuffer_ptr *start,
  11. const struct evbuffer_ptr *end);
  12. struct evbuffer_ptr evbuffer_search_eol(struct evbuffer *buffer,
  13. struct evbuffer_ptr *start, size_t *eol_len_out,
  14. enum evbuffer_eol_style eol_style);

peek 检查 evbuffer 里面的数据

  1. struct evbuffer_iovec {
  2. void *iov_base;
  3. size_t iov_len;
  4. };
  5. int evbuffer_peek(
  6. struct evbuffer *buffer, ev_ssize_t len,
  7. struct evbuffer_ptr *start_at,
  8. struct evbuffer_iovec *vec_out, int n_vec);

在 socket IO 上使用 evbuffer

  1. int evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd);
  2. int evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
  3. ev_ssize_t howmuch);
  4. int evbuffer_read(struct evbuffer *buffer, evutil_socket_t fd, int howmuch);

listener.h

evconnlistener 机制为您提供了一种侦听和接受传入 TCP 连接的方法。

创建、释放 evconnlistener

  1. struct evconnlistener *evconnlistener_new(
  2. struct event_base *base,
  3. evconnlistener_cb cb,
  4. void *ptr,
  5. unsigned flags,
  6. int backlog,
  7. evutil_socket_t fd);
  8. struct evconnlistener *evconnlistener_new_bind(
  9. struct event_base *base,
  10. evconnlistener_cb cb,
  11. void *ptr,
  12. unsigned flags,
  13. int backlog,
  14. const struct sockaddr *sa,
  15. int socklen);
  16. void evconnlistener_free(struct evconnlistener *lev);

支持设置的 flags 标志

  1. LEV_OPT_LEAVE_SOCKETS_BLOCKING
  2. /* By default, when the connection listener accepts a new incoming socket, it sets it up to be nonblocking so that you can use it with the rest of Libevent. Set this flag if you do not want this behavior. */
  3. LEV_OPT_CLOSE_ON_FREE
  4. /* If this option is set, the connection listener closes its underlying socket when you free it. */
  5. LEV_OPT_CLOSE_ON_EXEC
  6. /* If this option is set, the connection listener sets the close-on-exec flag on the underlying listener socket. See your platform documentation for fcntl and FD_CLOEXEC for more information. */
  7. LEV_OPT_REUSEABLE
  8. /* By default on some platforms, once a listener socket is closed, no other socket can bind to the same port until a while has passed. Setting this option makes Libevent mark the socket as reusable, so that once it is closed, another socket can be opened to listen on the same port. */
  9. LEV_OPT_THREADSAFE
  10. /* Allocate locks for the listener, so that it’s safe to use it from multiple threads. New in Libevent 2.0.8-rc. */
  11. LEV_OPT_DISABLED
  12. /* Initialize the listener to be disabled, not enabled. You can turn it on manually with evconnlistener_enable(). New in Libevent 2.1.1-alpha. */
  13. LEV_OPT_DEFERRED_ACCEPT
  14. /* If possible, tell the kernel to not announce sockets as having been accepted until some data has been received on them, and they are ready for reading. Do not use this option if your protocol doesn’t start out with the client transmitting data, since in that case this option will sometimes cause the kernel to never tell you about the connection. Not all operating systems support this option: on ones that don’t, this option has no effect. New in Libevent 2.1.1-alpha. */

设置 listener 的回调函数

  1. typedef void (*evconnlistener_cb)(
  2. struct evconnlistener *listener,
  3. evutil_socket_t sock,
  4. struct sockaddr *addr,
  5. int len,
  6. void *ptr
  7. );
  8. int evconnlistener_enable(struct evconnlistener *lev);
  9. int evconnlistener_disable(struct evconnlistener *lev);
  10. void evconnlistener_set_cb(
  11. struct evconnlistener *lev,
  12. evconnlistener_cb cb, void *arg);

设置 listener 错误时的回调

  1. typedef void (*evconnlistener_errorcb)(struct evconnlistener *lis, void *ptr);
  2. void evconnlistener_set_error_cb(struct evconnlistener *lev,
  3. evconnlistener_errorcb errorcb);

获取 listener 管理的信息

  1. evutil_socket_t evconnlistener_get_fd(struct evconnlistener *lev);
  2. struct event_base *evconnlistener_get_base(struct evconnlistener *lev);

libevent 例子

echo 服务器(单线程)

gcc -levent -o echo-single-thread echo-single-thread.c

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <errno.h>
  5. #include <signal.h>
  6. #include <arpa/inet.h>
  7. #include <sys/types.h>
  8. #include <sys/socket.h>
  9. #include <event2/event.h>
  10. #include <event2/listener.h>
  11. #include <event2/bufferevent.h>
  12. #include <event2/buffer.h>
  13. #include <event2/util.h>
  14. /* define listen address */
  15. #define LISTEN_ADDR "0.0.0.0"
  16. #define LISTEN_PORT 8080
  17. /* define static variable */
  18. static struct event_base *base = NULL;
  19. static struct evconnlistener *listener = NULL;
  20. /* signal handler function */
  21. void signal_handler(int signum) {
  22. if (listener != NULL) evconnlistener_free(listener);
  23. if (base != NULL) event_base_free(base);
  24. exit(0);
  25. }
  26. /* callback function for accept */
  27. void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int addrlen, void *arg);
  28. void accept_error_cb(struct evconnlistener *listener, void *arg);
  29. /* callback function for read/event */
  30. void read_conn_cb(struct bufferevent *bev, void *arg);
  31. void event_conn_cb(struct bufferevent *bev, short events, void *arg);
  32. int main() {
  33. /* register signal handler */
  34. signal(SIGHUP, signal_handler);
  35. signal(SIGINT, signal_handler);
  36. signal(SIGQUIT, signal_handler);
  37. signal(SIGTERM, signal_handler);
  38. /* setting listen address */
  39. struct sockaddr_in servaddr;
  40. memset(&servaddr, 0, sizeof(servaddr));
  41. servaddr.sin_family = AF_INET;
  42. servaddr.sin_addr.s_addr = inet_addr(LISTEN_ADDR);
  43. servaddr.sin_port = htons(LISTEN_PORT);
  44. /* create evconnlistener */
  45. base = event_base_new();
  46. listener = evconnlistener_new_bind(
  47. base, accept_conn_cb, NULL,
  48. LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
  49. -1, (struct sockaddr *)&servaddr, sizeof(servaddr)
  50. );
  51. if (listener == NULL) {
  52. perror("listen socket");
  53. return errno;
  54. }
  55. evconnlistener_set_error_cb(listener, accept_error_cb);
  56. /* start event loop */
  57. printf("listening: %s:%d\n", LISTEN_ADDR, LISTEN_PORT);
  58. event_base_dispatch(base);
  59. /* free listener, event_base */
  60. evconnlistener_free(listener);
  61. event_base_free(base);
  62. return 0;
  63. }
  64. /* implement accept_conn_cb function */
  65. void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int addrlen, void *arg) {
  66. /* welcome new connection */
  67. struct sockaddr_in *connaddr = (struct sockaddr_in *)addr; // client sockaddr
  68. printf("welcome: %s:%d\n", inet_ntoa(connaddr->sin_addr), ntohs(connaddr->sin_port));
  69. /* setting bufferevent for client socket */
  70. struct bufferevent *bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
  71. bufferevent_setcb(bev, read_conn_cb, NULL, event_conn_cb, NULL);
  72. bufferevent_enable(bev, EV_READ | EV_WRITE);
  73. }
  74. /* implement accept_error_cb function */
  75. void accept_error_cb(struct evconnlistener *listener, void *arg) {
  76. char *error_string = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
  77. fprintf(stderr, "accept socket: %s\n", error_string);
  78. event_base_loopexit(base, NULL);
  79. }
  80. /* implement read_conn_cb function */
  81. void read_conn_cb(struct bufferevent *bev, void *arg) {
  82. /* get input/output evbuffers */
  83. struct evbuffer *input = bufferevent_get_input(bev);
  84. struct evbuffer *output = bufferevent_get_output(bev);
  85. /* get client socket address */
  86. struct sockaddr_in connaddr; socklen_t addrlen = sizeof(connaddr);
  87. getpeername(bufferevent_getfd(bev), (struct sockaddr *)&connaddr, &addrlen);
  88. /* print data and send to client */
  89. int len = evbuffer_get_length(input);
  90. char *data = (char *)malloc(len + 1);
  91. evbuffer_copyout(input, data, len);
  92. data[len] = 0;
  93. printf("recv(%s:%d): %s", inet_ntoa(connaddr.sin_addr), ntohs(connaddr.sin_port), data);
  94. evbuffer_add_buffer(output, input);
  95. free(data);
  96. }
  97. /* implement event_conn_cb function */
  98. void event_conn_cb(struct bufferevent *bev, short events, void *arg) {
  99. if (events & BEV_EVENT_ERROR) // occur socket error
  100. perror("recv socket");
  101. if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { // close client socket
  102. struct sockaddr_in connaddr; socklen_t addrlen = sizeof(connaddr);
  103. getpeername(bufferevent_getfd(bev), (struct sockaddr *)&connaddr, &addrlen);
  104. printf("goodbye: %s:%d\n", inet_ntoa(connaddr.sin_addr), ntohs(connaddr.sin_port));
  105. bufferevent_free(bev);
  106. }
  107. }

echo 服务器(多线程)

注意:实际上这个程序有问题,event_base 默认不是线程安全的。
gcc -levent -lpthread -o echo-multi-thread echo-multi-thread.c

  1. #define _GNU_SOURCE
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <errno.h>
  6. #include <signal.h>
  7. #include <unistd.h>
  8. #include <pthread.h>
  9. #include <arpa/inet.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <sys/syscall.h>
  13. #include <event2/event.h>
  14. #include <event2/listener.h>
  15. #include <event2/bufferevent.h>
  16. #include <event2/buffer.h>
  17. #include <event2/util.h>
  18. /* macro for get thread id */
  19. #define gettid() syscall(__NR_gettid)
  20. /* define listen address */
  21. #define LISTEN_ADDR "0.0.0.0"
  22. #define LISTEN_PORT 8080
  23. // number of new connection
  24. static size_t accept_conn_count = 0;
  25. // event_base for master thread
  26. static struct event_base *base_0 = NULL;
  27. // event_base for worker thread - 1
  28. static struct event_base *base_1 = NULL;
  29. // event_base for worker thread - 2
  30. static struct event_base *base_2 = NULL;
  31. // event_base for worker thread - 3
  32. static struct event_base *base_3 = NULL;
  33. // event_base for worker thread - 4
  34. static struct event_base *base_4 = NULL;
  35. // evconnlistener static variable */
  36. static struct evconnlistener *listener = NULL;
  37. /* define signal handler function */
  38. void signal_handler(int signum) {
  39. if (listener != NULL) evconnlistener_free(listener);
  40. if (base_0 != NULL) event_base_free(base_0);
  41. if (base_1 != NULL) event_base_free(base_1);
  42. if (base_2 != NULL) event_base_free(base_2);
  43. if (base_3 != NULL) event_base_free(base_3);
  44. if (base_4 != NULL) event_base_free(base_4);
  45. exit(0);
  46. }
  47. /* define worker thread id */
  48. static pthread_t worker_tid_1;
  49. static pthread_t worker_tid_2;
  50. static pthread_t worker_tid_3;
  51. static pthread_t worker_tid_4;
  52. /* worker thread function */
  53. void *worker_thread_func(void *arg) {
  54. // event_base assigned from the main function
  55. struct event_base *base = (struct event_base *)arg;
  56. event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
  57. event_base_free(base);
  58. return NULL;
  59. }
  60. /* callback function for accept */
  61. void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int addrlen, void *arg);
  62. void accept_error_cb(struct evconnlistener *listener, void *arg);
  63. /* callback function for read/event */
  64. void read_conn_cb(struct bufferevent *bev, void *arg);
  65. void event_conn_cb(struct bufferevent *bev, short events, void *arg);
  66. /* master thread function */
  67. int main() {
  68. /* register signal handler */
  69. signal(SIGHUP, signal_handler);
  70. signal(SIGINT, signal_handler);
  71. signal(SIGQUIT, signal_handler);
  72. signal(SIGTERM, signal_handler);
  73. /* create event base struct */
  74. base_0 = event_base_new(); // for master thread
  75. base_1 = event_base_new(); // for worker thread 1
  76. base_2 = event_base_new(); // for worker thread 2
  77. base_3 = event_base_new(); // for worker thread 3
  78. base_4 = event_base_new(); // for worker thread 4
  79. /* create and start worker threads */
  80. // worker thread 1
  81. if (pthread_create(&worker_tid_1, NULL, worker_thread_func, base_1) != 0) {
  82. perror("create thread");
  83. return errno;
  84. }
  85. // worker thread 2
  86. if (pthread_create(&worker_tid_2, NULL, worker_thread_func, base_2) != 0) {
  87. perror("create thread");
  88. return errno;
  89. }
  90. // worker thread 3
  91. if (pthread_create(&worker_tid_3, NULL, worker_thread_func, base_3) != 0) {
  92. perror("create thread");
  93. return errno;
  94. }
  95. // worker thread 4
  96. if (pthread_create(&worker_tid_4, NULL, worker_thread_func, base_4) != 0) {
  97. perror("create thread");
  98. return errno;
  99. }
  100. /* setting listen sockaddr */
  101. struct sockaddr_in servaddr;
  102. memset(&servaddr, 0, sizeof(servaddr));
  103. servaddr.sin_family = AF_INET;
  104. servaddr.sin_addr.s_addr = inet_addr(LISTEN_ADDR);
  105. servaddr.sin_port = htons(LISTEN_PORT);
  106. /* create evconnlistener */
  107. listener = evconnlistener_new_bind(
  108. base_0, accept_conn_cb, NULL,
  109. LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
  110. -1, (struct sockaddr *)&servaddr, sizeof(servaddr)
  111. );
  112. if (listener == NULL) {
  113. perror("listen socket");
  114. return errno;
  115. }
  116. evconnlistener_set_error_cb(listener, accept_error_cb);
  117. /* start event loop (master) */
  118. printf("listening: %s:%d\n", LISTEN_ADDR, LISTEN_PORT);
  119. event_base_dispatch(base_0); // blocking here
  120. /* cancel worker threads when master thread's event loop exited */
  121. pthread_cancel(worker_tid_1);
  122. pthread_cancel(worker_tid_2);
  123. pthread_cancel(worker_tid_3);
  124. pthread_cancel(worker_tid_4);
  125. /* free listener, event_base */
  126. evconnlistener_free(listener);
  127. event_base_free(base_0);
  128. event_base_free(base_1);
  129. event_base_free(base_2);
  130. event_base_free(base_3);
  131. event_base_free(base_4);
  132. return 0;
  133. }
  134. /* implement accept_conn_cb function */
  135. void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int addrlen, void *arg) {
  136. /* welcome new connection */
  137. struct sockaddr_in *connaddr = (struct sockaddr_in *)addr;
  138. printf("welcome: %s:%d\n", inet_ntoa(connaddr->sin_addr), ntohs(connaddr->sin_port));
  139. /* averagely distribute new connection to each worker thread */
  140. accept_conn_count++;
  141. struct event_base *base = NULL;
  142. if (accept_conn_count % 4 == 1) base = base_1;
  143. else if (accept_conn_count % 4 == 2) base = base_2;
  144. else if (accept_conn_count % 4 == 3) base = base_3;
  145. else if (accept_conn_count % 4 == 0) base = base_4;
  146. /* setting bufferevent for new connection */
  147. struct bufferevent *bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
  148. bufferevent_setcb(bev, read_conn_cb, NULL, event_conn_cb, NULL);
  149. bufferevent_enable(bev, EV_READ | EV_WRITE);
  150. }
  151. /* implement accept_error_cb function */
  152. void accept_error_cb(struct evconnlistener *listener, void *arg) {
  153. char *error_string = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
  154. fprintf(stderr, "accept socket: %s\n", error_string);
  155. event_base_loopexit(base_0, NULL);
  156. }
  157. /* implement read_conn_cb function */
  158. void read_conn_cb(struct bufferevent *bev, void *arg) {
  159. /* get input/output evbuffers */
  160. struct evbuffer *input = bufferevent_get_input(bev);
  161. struct evbuffer *output = bufferevent_get_output(bev);
  162. /* get client socket address */
  163. struct sockaddr_in connaddr; socklen_t addrlen = sizeof(struct sockaddr_in);
  164. getpeername(bufferevent_getfd(bev), (struct sockaddr *)&connaddr, &addrlen);
  165. /* print data and send to client */
  166. int len = evbuffer_get_length(input);
  167. char *str = (char *)malloc(len + 1);
  168. evbuffer_copyout(input, str, len);
  169. str[len] = 0;
  170. printf("recv(tid: %ld | addr: %s:%d): %s", gettid(), inet_ntoa(connaddr.sin_addr), ntohs(connaddr.sin_port), str);
  171. evbuffer_add_buffer(output, input);
  172. free(str);
  173. }
  174. /* implement event_conn_cb function */
  175. void event_conn_cb(struct bufferevent *bev, short events, void *arg) {
  176. if (events & BEV_EVENT_ERROR)
  177. perror("recv socket");
  178. if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
  179. struct sockaddr_in connaddr; socklen_t addrlen = sizeof(struct sockaddr_in);
  180. getpeername(bufferevent_getfd(bev), (struct sockaddr *)&connaddr, &addrlen);
  181. printf("goodbye: %s:%d\n", inet_ntoa(connaddr.sin_addr), ntohs(connaddr.sin_port));
  182. bufferevent_free(bev);
  183. }
  184. }

echo 服务器(多线程)

gcc -lpthread -levent -levent_pthreads -o echo-multi-thread echo-multi-thread.c

  1. #define _GNU_SOURCE
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <errno.h>
  6. #include <signal.h>
  7. #include <unistd.h>
  8. #include <pthread.h>
  9. #include <arpa/inet.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <sys/syscall.h>
  13. #include <event2/event.h>
  14. #include <event2/listener.h>
  15. #include <event2/bufferevent.h>
  16. #include <event2/buffer.h>
  17. #include <event2/util.h>
  18. #include <event2/thread.h>
  19. /* macro for get thread id */
  20. #define gettid() syscall(__NR_gettid)
  21. /* define listen address */
  22. #define LISTEN_ADDR "0.0.0.0"
  23. #define LISTEN_PORT 8080
  24. // number of new connection
  25. static size_t accept_conn_count = 0;
  26. // event_base for master thread
  27. static struct event_base *base_0 = NULL;
  28. // event_base for worker thread - 1
  29. static struct event_base *base_1 = NULL;
  30. // event_base for worker thread - 2
  31. static struct event_base *base_2 = NULL;
  32. // event_base for worker thread - 3
  33. static struct event_base *base_3 = NULL;
  34. // event_base for worker thread - 4
  35. static struct event_base *base_4 = NULL;
  36. // evconnlistener static variable */
  37. static struct evconnlistener *listener = NULL;
  38. /* define signal handler function */
  39. void signal_handler(int signum) {
  40. if (listener != NULL) evconnlistener_free(listener);
  41. if (base_0 != NULL) event_base_free(base_0);
  42. if (base_1 != NULL) event_base_free(base_1);
  43. if (base_2 != NULL) event_base_free(base_2);
  44. if (base_3 != NULL) event_base_free(base_3);
  45. if (base_4 != NULL) event_base_free(base_4);
  46. exit(0);
  47. }
  48. /* define worker thread id */
  49. static pthread_t worker_tid_1;
  50. static pthread_t worker_tid_2;
  51. static pthread_t worker_tid_3;
  52. static pthread_t worker_tid_4;
  53. /* worker thread function */
  54. void *worker_thread_func(void *arg) {
  55. // event_base assigned from the main function
  56. struct event_base *base = (struct event_base *)arg;
  57. event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
  58. event_base_free(base);
  59. return NULL;
  60. }
  61. /* callback function for accept */
  62. void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int addrlen, void *arg);
  63. void accept_error_cb(struct evconnlistener *listener, void *arg);
  64. /* callback function for read/event */
  65. void read_conn_cb(struct bufferevent *bev, void *arg);
  66. void event_conn_cb(struct bufferevent *bev, short events, void *arg);
  67. /* master thread function */
  68. int main() {
  69. /* register signal handler */
  70. signal(SIGHUP, signal_handler);
  71. signal(SIGINT, signal_handler);
  72. signal(SIGQUIT, signal_handler);
  73. signal(SIGTERM, signal_handler);
  74. /* create event base for master */
  75. base_0 = event_base_new(); // for master thread
  76. /* create event base for worker */
  77. // evthread_use_pthreads() 作用(此函数需要链接到的库:`-levent_pthreads`):
  78. // 如果在多个线程中访问 event_base,则必须调用此函数加锁,否则不是线程安全的!
  79. evthread_use_pthreads();
  80. base_1 = event_base_new(); // for worker thread 1
  81. base_2 = event_base_new(); // for worker thread 2
  82. base_3 = event_base_new(); // for worker thread 3
  83. base_4 = event_base_new(); // for worker thread 4
  84. /* create and start worker threads */
  85. // worker thread 1
  86. if (pthread_create(&worker_tid_1, NULL, worker_thread_func, base_1) != 0) {
  87. perror("create thread");
  88. return errno;
  89. }
  90. // worker thread 2
  91. if (pthread_create(&worker_tid_2, NULL, worker_thread_func, base_2) != 0) {
  92. perror("create thread");
  93. return errno;
  94. }
  95. // worker thread 3
  96. if (pthread_create(&worker_tid_3, NULL, worker_thread_func, base_3) != 0) {
  97. perror("create thread");
  98. return errno;
  99. }
  100. // worker thread 4
  101. if (pthread_create(&worker_tid_4, NULL, worker_thread_func, base_4) != 0) {
  102. perror("create thread");
  103. return errno;
  104. }
  105. /* setting listen sockaddr */
  106. struct sockaddr_in servaddr;
  107. memset(&servaddr, 0, sizeof(servaddr));
  108. servaddr.sin_family = AF_INET;
  109. servaddr.sin_addr.s_addr = inet_addr(LISTEN_ADDR);
  110. servaddr.sin_port = htons(LISTEN_PORT);
  111. /* create evconnlistener */
  112. listener = evconnlistener_new_bind(
  113. base_0, accept_conn_cb, NULL,
  114. LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
  115. -1, (struct sockaddr *)&servaddr, sizeof(servaddr)
  116. );
  117. if (listener == NULL) {
  118. perror("listen socket");
  119. return errno;
  120. }
  121. evconnlistener_set_error_cb(listener, accept_error_cb);
  122. /* start event loop (master) */
  123. printf("listening: %s:%d\n", LISTEN_ADDR, LISTEN_PORT);
  124. event_base_dispatch(base_0); // blocking here
  125. /* cancel worker threads when master thread's event loop exited */
  126. pthread_cancel(worker_tid_1);
  127. pthread_cancel(worker_tid_2);
  128. pthread_cancel(worker_tid_3);
  129. pthread_cancel(worker_tid_4);
  130. /* free listener, event_base */
  131. evconnlistener_free(listener);
  132. event_base_free(base_0);
  133. event_base_free(base_1);
  134. event_base_free(base_2);
  135. event_base_free(base_3);
  136. event_base_free(base_4);
  137. return 0;
  138. }
  139. /* implement accept_conn_cb function */
  140. void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int addrlen, void *arg) {
  141. /* welcome new connection */
  142. struct sockaddr_in *connaddr = (struct sockaddr_in *)addr;
  143. printf("welcome: %s:%d\n", inet_ntoa(connaddr->sin_addr), ntohs(connaddr->sin_port));
  144. /* averagely distribute new connection to each worker thread */
  145. accept_conn_count++;
  146. struct event_base *base = NULL;
  147. if (accept_conn_count % 4 == 1) base = base_1;
  148. else if (accept_conn_count % 4 == 2) base = base_2;
  149. else if (accept_conn_count % 4 == 3) base = base_3;
  150. else if (accept_conn_count % 4 == 0) base = base_4;
  151. /* setting bufferevent for new connection */
  152. struct bufferevent *bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
  153. bufferevent_setcb(bev, read_conn_cb, NULL, event_conn_cb, NULL);
  154. bufferevent_enable(bev, EV_READ | EV_WRITE);
  155. }
  156. /* implement accept_error_cb function */
  157. void accept_error_cb(struct evconnlistener *listener, void *arg) {
  158. char *error_string = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
  159. fprintf(stderr, "accept socket: %s\n", error_string);
  160. event_base_loopexit(base_0, NULL);
  161. }
  162. /* implement read_conn_cb function */
  163. void read_conn_cb(struct bufferevent *bev, void *arg) {
  164. /* get input/output evbuffers */
  165. struct evbuffer *input = bufferevent_get_input(bev);
  166. struct evbuffer *output = bufferevent_get_output(bev);
  167. /* get client socket address */
  168. struct sockaddr_in connaddr; socklen_t addrlen = sizeof(struct sockaddr_in);
  169. getpeername(bufferevent_getfd(bev), (struct sockaddr *)&connaddr, &addrlen);
  170. /* print data and send to client */
  171. int len = evbuffer_get_length(input);
  172. char *str = (char *)malloc(len + 1);
  173. evbuffer_copyout(input, str, len);
  174. str[len] = 0;
  175. printf("recv(tid: %ld | addr: %s:%d): %s", gettid(), inet_ntoa(connaddr.sin_addr), ntohs(connaddr.sin_port), str);
  176. evbuffer_add_buffer(output, input);
  177. free(str);
  178. }
  179. /* implement event_conn_cb function */
  180. void event_conn_cb(struct bufferevent *bev, short events, void *arg) {
  181. if (events & BEV_EVENT_ERROR)
  182. perror("recv socket");
  183. if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
  184. struct sockaddr_in connaddr; socklen_t addrlen = sizeof(struct sockaddr_in);
  185. getpeername(bufferevent_getfd(bev), (struct sockaddr *)&connaddr, &addrlen);
  186. printf("goodbye: %s:%d\n", inet_ntoa(connaddr.sin_addr), ntohs(connaddr.sin_port));
  187. bufferevent_free(bev);
  188. }
  189. }

libevent 其它说明

  1. 在 libevent 运行完成之后(也就是程序退出之前),可以调用 void libevent_global_shutdown() 方法,来释放 libevent 申请的所有全局数据结构。注意,此函数是幂等的,即无论是调用一次还是调用多次,都是一样的结果,不会导致问题。还有,注意此函数必须位于所有 libevent 函数之后调用,否则其他 libevent 函数是无法感知到的,可能导致意想不到的错误。

  2. 创建 event_base 时不要使用默认的 event_base_new() 函数了,因为默认情况下 event_base 是有锁的,而我现在的应用场景完全不需要锁,所以需要使用 config 来定制 event_base。如下:

  1. struct event_config *event_config_new();
  2. void event_config_free(struct event_config *cfg);
  3. enum event_base_config_flag {
  4. EVENT_BASE_FLAG_NOLOCK = 0x01, // 建议
  5. EVENT_BASE_FLAG_IGNORE_ENV = 0x02, // 建议
  6. EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST = 0x10,
  7. };
  8. int event_config_set_flag(
  9. struct event_config *cfg,
  10. enum event_base_config_flag flag
  11. ); // 成功返回 0,失败返回 -1
  12. struct event_base *event_base_new_with_config(struct event_config *cfg);
  13. void event_base_free(struct event_base * base); // 注意它不会释放相关的 event
  1. 关于 event_base_loop。默认情况下,它会阻塞当前线程,直到处理完该 base 上所有注册的 event 为止(也就是说如果此时 base 上没有任何已添加的 event,那么该函数就会返回)。强调一点,所谓注册事件也就是 event_add()。默认情况下,struct event 是临时事件,也就是说,在 add 之后,当事件触发时,它只会运行一次,事件触发后,该 event 会自动被取消注册,即 event_del,但是不要忘记了 event_free() 啊,否则就内存泄漏了!!!当然,libevent 允许你创建持久性事件,所谓持久事件就说除非你显式 event_del 取消注册,否则不会自动 event_del。

  2. 当然,如果 event_base_loop 前,这个 base 里面没有任何已注册的 event,那么该调用会直接返回,不会阻塞。因为没事做啊,如果想让他不返回,也就是说我稍后会注册事件给你,则可以传递 EVLOOP_NO_EXIT_ON_EMPTY 这个 flag,告诉他即使没有 event 可循环,也不要返回!

  3. 事件的生命周期:新建状态(event_new)、挂起状态(event_add)、活动状态(事件触发时)

    • 如果是临时事件:则事件触发后会变成为新建状态,即 活动状态 -> 新建状态。默认
    • 如果是持久事件:则事件触发后继续保持挂起状态,即 活动状态 -> 挂起状态。循环
    • 你可以在 event 的挂起、活动状态下调用 event_del、event_free,它们是安全的
    • event_new、event_add、event_del、event_free,记住它们的对应关系,不要搞混了
    • 常用的 EVENT 事件类型:EV_READEV_TIMEOUTEV_PERSIST,多个类型使用 | 连接
  4. 在创建 event 时,如果需要将当前 event 指针传给回调函数,可以使用 void *event_self_cbarg() 方法,它返回的指针其实就是待会 event_new() 返回的指针。具体的原理我猜测是这样的:当你调用 event_self_cbarg 时,event 内部会 malloc 一个 event 的指针,然后返回给调用者。紧接着,当调用 event_new 函数时,event 内部检测到我们调用了 event_self_cbarg 函数,于是不再分配新的内存,而是直接将创建的 event 数据结构写入到这个指针当中,然后返回。于是就达到了传递 event 指针的目的。注意中间不要进行其他操作,否则可能出现不可预知的问题。。猜测错误,经过实际测试得知,event_self_cbarg 返回的是一个固定的地址,也就是某个静态指针,具体的原理只能通过查看源码得知了。(暂时留个坑吧,主要是怕这个函数不是线程安全的)。我猜错了,原来它的工作原理是这么简单粗暴,event_self_cbarg 返回的应该是个 flag 指针,它存储的内容没有实际意义,他只不过是用来标识 event_new 时,如果传入的 arg 的值与他相等,则表示它想接受 this 指针,实际上传递给 cbfunc 的不是传入的这个指针,而是内部会重新传入正确的指针给它。也就是说 event_self_cbarg() 函数的用途仅限:struct event *ev = event_new(base, sock, events, cbfunc, event_self_cbarg()),其他方式(比如尝试将这个指针放在结构体中,是不现实的),但是我们可以这样做:

  1. struct cbarg *arg = malloc(sizeof(struct cbarg));
  2. struct event *ev = event_new(base, sock, events, cbfunc, cbarg);
  3. cbarg -> event_ptr = ev;
  4. event_add(base, NULL);
  5. event_base_disptch(base);

因为我还没运行 event,所以此时完全来得及将 ev 指针传递给结构体,待会直接用就行了。

  1. evutil_socket_t 在 unix 中其实就是 int 类型。如果发生套接字 IO 错误,直接使用 errnostrerror(errno) 即可,不需要使用所谓跨平台宏,没必要。注意,errno 是线程安全的(thread local 存储),但是 strerror() 返回的是静态指针,不是线程安全的,需使用 strerror_r() 线程安全版本(和 time 的那个函数类似,貌似后缀 _r 的都是线程安全的,即自己提供 buffer,为了线程安全,最好的方式就是,这个 buffer 是栈上的变量,如字符数组)。
  1. int evutil_make_socket_nonblocking(int sock); // 设置为非阻塞套接字
  2. int evutil_make_listen_socket_reuseable(int sock); // 设置 SO_REUSEADDR
  3. int evutil_sockaddr_cmp( // 比较两个 sockaddr 地址,ipv4/ipv6
  4. struct sockaddr *sa1, // 地址 A
  5. struct sockaddr *sa2, // 地址 B
  6. int include_port // 是否包含端口 1/0 aka true/false
  7. ); // 相等则返回 0,和 strcmp 是一样的返回值
  1. 分配内存时建议使用 calloc,而不是 malloc() + memset(),前者更快!除非你不担心垃圾值。

  2. 每个 bufferevent 都有 4 个水位线,读水位线 2 个,写水位线 2 个,我们主要关心读水位线,因为写的话只要负责 write 就行,libevent 会自动发送出去,通常情况下 write 都不会失败。读水位线有两个,一高一低。先说低水位线,只有当 bufferevent 的 input 缓冲区的数据大小超过(或达到)低水位线时,设置的 read 回调函数才会被调用,读低水位线默认为 0,即只要有数据可读就会调用读回调,一般我们不需要改动这个,默认就行。我们主要关心一下读高水位线,高水位线是说,如果 bufferevent 的 input 缓冲区的数据大小达到此值时,bufferevent 会停止从 socket 中读取数据(TCP 滑动窗口变小),直到 read 回调处理了其中的数据,让数据大小低于高水位线后才会继续从 socket 中读取。这个高水位线默认是无限制的,这可能会导致一些问题,比如代理软件,他要管理两个连接,一个是与 client 的,一个是与 server 的,如果 client 这边的网速很慢,而 server 这边的网速很快,那么 server 这边的 bufferevent 的 input 缓冲区将会很快膨胀,即内存占用很大,直到这些数据都转发给了 client 那边。这时我们必须设置一个高水位线,防止 server 这边的 input 缓冲区无限膨胀,导致内存吃得过多,而被系统 kill 掉。比如最大设置为 3~5M 就行了。(但仔细想想好像没这么简单,稍后再说吧)

  3. bufferevent 的回调函数中会接收到 events 参数,我们可以通过 BEV_EVENT_READING 来判断是不是读数据期间发生了事件,同样的,可以通过 BEV_EVENT_WRITING 来判断是不是写数据期间发生了事件。

  4. 再次重申,libevent 中的所有 socket 套接字都必须设置为 non-blocking 的,以免出现问题。

  5. bufferevent 的 add 事件是通过 bufferevent_enable 启用的,对应的是 disable。事件只有两个,EV_READ、EV_WRITE。如果没有启用对应事件,bev 将不会尝试读取或写入数据。默认情况下,新创建的 bev 已经启用了 EV_WRITE 事件,你也可以通过 disable 来取消注册 EV_WRITE 事件。有必要再声明几点:启用 WRITE 事件是指,当 output 缓冲区中有数据时,libevent 会自动调用 send 将数据发送出去(写到 socket 缓冲区);而 READ 事件是指:当 socket 缓冲区中有数据时,libevent 会自动从 socket 缓冲区将数据剪切到 bufferevent 的 input 缓冲区,供 read 回调读取。

  6. 设置水位线的函数:

  1. void bufferevent_setwatermark(
  2. struct bufferevent *bufev,
  3. short events, // EV_READ 或 EV_WRITE 或 EV_READ | EV_WRITE
  4. size_t lowmark, // 低水位线
  5. size_t highmark // 高水位线(0 表示无限制)
  6. );

网络编程个人总结

OSI 七层网络模型 从下到上分别为:

  1. 物理层:光纤、以太网、同轴电缆 等
  2. 链路层:以太网、WiFi、PPPoE、GPRS 等
  3. 网络层:IPv4、IPv6、ICMP、BGP、OSPF 等
  4. 传输层:TCP、UDP、DCCP、SCTP、PPTP、RSVP 等
  5. 会话层:已弃用,应用层可替代
  6. 表示层:已弃用,应用层可替代
  7. 应用层:HTTP、FTP、DNS、NTP、DHCP、SMTP、POP3、SSH 等

其实我们现在应该将其称为 OSI 五层网络模型,因为会话层和表示层没有存在的意义和必要。作为网络开发者,需要了解的是 网络层(IP)、传输层(TCP、UDP)、应用层(HTTP、FTP 等)。但是日常使用中,接触的最多的只是 应用层,网络层用不着你操心,传输层有 TCP、UDP 两个就够了。而应用层说到底还是数据格式而已,HTTP、FTP、DNS 还是其他的什么协议,其实都是规定的数据格式而已。我们完全可以自己创建一个应用层协议,格式随便定义。

TCP 和 UDP 的区别

一句话总结:TCP 就是打电话,UDP 则是发短信

先说 UDP 吧,发短信我们知道,一条短信有 2 个元信息:发信人收信人。而数据就是我们所发的短信内容了。发短信是没有所谓的状态的(或者叫做连接吧,更好理解一点),我只管发送出去,而对方是否收到了,是否查看了短信,我们是无从得知的。UDP 也是如此,一个 UDP 包就像一条短信,它也有两个元信息:发送方 IP:Port接收方 IP:Port,然后就是里面的数据。发送方将一个 UDP 包发送出去后,就不会再管它了,对方接收到了也好,没接收到也好,都不关发送方的事。而接收方在接收到一个 UDP 包之后,可以从中读取到发送者的 IP:Port(也就是 recvfrom 后面的两个参数),然后接下来处理其中的数据即可(相当于短信接收方读取短信内容一样)。有必要强调一点,如果 UDP 包在发送的过程中(网络传输过程中)丢失了(如 IP 层分片,某个分片不见了,导致 UDP 包是坏的),那么接收方是不会收到这个 UDP 包的,所以接收方收到的 UDP 包一定是完整的 UDP 包。而且 UDP 包也没有所谓的粘包问题,因为你收到的只能是一个完整的 UDP 包,其他的丢失的、坏的包你是收不到的(底层网络栈会收到这些坏包,但是它不会交由应用程序处理)。总之,对 UDP 编程有疑问的时候,多联想一下发送短信、接收短信的过程就清楚了。因为这个特点,UDP 只适用于一应一答的场景,如 DNS,客户端发送一个 DNS 请求,服务器收到之后,发送一个 DNS 响应,这个任务就算完成了。总之和发短信一样。

再说 TCP,TCP 就是打电话,所以每次与对方交流之前,必须先拨通对方的电话,这个拨号的过程叫做 TCP 握手,拨号成功后,双方都进入已连接状态,也就是电话打通了,可以交流了(双向交流,即你可以发送数据,他也可以发送数据),如果聊完了,就需要挂掉电话,这个挂电话也就是 TCP 挥手。所以 TCP 是流式的传输、面向连接的、可靠的传输层协议。UDP 则是用户数据报协议,"数据报"="短信"。TCP 有所谓的粘包问题,也就是如何分清界线,比如我让对方发送一个文件给我,我该如何知道这个文件是发完了还是没发完,这就是应用层协议自己该考虑的问题了,比如 HTTP 协议使用 Content-Length 头部来解决这个问题,这个头部用来指示携带的数据有多少字节,这样接收方就能判断是否收完了数据。

最后聊一下 REDIRECT 和 TPROXY 两种不同的透明代理方式,REDIRECT 只适用于 TCP,而 TPROXY 支持 TCP 和 UDP。REDIRECT 其实就是 DNAT 的一种,只不过它的目的地址始终是 127.0.0.1 而已。

REDIRECT 实现 TCP 透明代理很简单,proxy 在收到新连接之后,调用 getsockopt 就可以获取这个连接的原目的地址和端口信息了,然后 proxy 再与目的地址建立连接,之后无脑转发数据就行,简单。

而 TPROXY 则稍微麻烦一点,首先 TPROXY 在必要的前提下会改变数据包的 dst_port。proxy 监听一个端口,假设为 60053,并且假设为 UDP 端口。proxy 的监听地址为 0.0.0.0(任意 IP 地址),所以,我们只需要设置好路由(local 类型的路由),将这些需要代理的 UDP 包路由到 proxy 上即可。比如客户端往 8.8.8.8:53/udp 发送 DNS 请求,那么在经过网关时,TPROXY 规则会将这个 UDP 包的目的端口改为 60053(原端口会写入到 socket msg 之中,稍后通过 recvmsg 可读取到),然后进过路由,被那条 local 路由命中,于是发往本机的 proxy 进程,proxy 收到后(调用 recvmsg 而不是 recvfrom,因为需要读取原目的地址和目的端口信息),获取到目的地址和目的端口,然后创建一个新的 UDP socket,发送给目的服务器。当这个 UDP socket 接收到服务器返回的响应之后,proxy 又创建过一个新的 UDP socket 用于响应客户端,这个 UDP socket 绑定的 IP 和 Port 就是之前读取到的 dst_addr、dst_port(需要给 socket 设置特殊套接字选项才能绑定成功),然后使用这个 socket 发送响应回客户端即可。在客户端开来,这的确是我刚才请求的 dst_addr:dst_port 返回的数据,所以没有问题,也即透明代理。对于 TPROXY 如何代理 TCP,我目前不太清楚 proxy 如何写。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注