[关闭]
@lishuhuakai 2016-11-04T08:46:28.000000Z 字数 10786 阅读 1636

一起来写web server 06 -- 单线程非阻塞IO版本


阻塞IO的效率是在是低下,你如果要写高性能的web server的话,你必须使用非阻塞IO.

非阻塞IO的读写

在谈到非阻塞IO之前,必须先谈一谈阻塞IO,在网络编程中,我们假设有一个监听套接字的sockfd,这个sockfd是你调用了socket,listen, bind这一大票的函数得到的一个文件描述符.其实它默认就是阻塞的,具体的表现是:

  1. 使用accept函数监听sockfd时,如果没有连接到来,这个函数会一直阻塞在那里.

  2. sockfd调用recv函数的时候,如果对方还没有发送数据过来,这个函数也会一直阻塞.

  3. sockfd执行write操作的时候,如果tcp缓冲区已经满了,那么write函数也会阻塞在那里,一直到数据写完了才返回.

  4. ......

非阻塞的IO确实没有什么不好的,编程简单,逻辑清晰,但是在今天的话,有一个致命的缺点,那就是资源的利用率不高,具体来说,就是通过阻塞IO编写的服务端程序只有很少的时间在工作,大部分时间都处在阻塞状态.

为了提高程序的性能,我们应该充分利用线程阻塞的那段时间,毕竟那么长的时间里,我们可以干很多的事情,所以聪明的程序员弄出了个非阻塞IO的玩意,具体而言,就是一旦对一个文件描述符设置了非阻塞,比如说前面的sockfd,我们调用accept, write, recv等函数的时候,如果碰到数据没到,缓冲区已满这类事情,不会像之前那样阻塞在那里,这些函数会立即返回,并且设置一些标志让调用者知道.

单纯的非阻塞IO其实并没有多么大的用处,假如你要写向sockfd写入数据,如果sockfd是非阻塞的话,你大概会这样编程:

  1. while (true) {
  2. ret = write(sockfd, buf, sizeof(buf));
  3. if 数据写完了
  4. break;
  5. if 资源不可用 or 还有剩余数据没写完
  6. continue;
  7. }

如果我们用上面的方法来编写代码的话,还不如用直接用阻塞IO呢,轮询可比阻塞傻逼多了.

非阻塞IO加上epoll, select, poll这些IO多路复用机制,我们才可以高效地利用以前被阻塞的时光.高效确实非常高效,不过我想说的一点是 --

非阻塞IO加上这些IO复用机制,会使得代码的复杂度急剧上升.如果你用这些机制来编写网络程序的话,更加要小心,因为这些东西调试起来并不是那么方便,代码很可能死在一个微小而不易察觉的点上.

我顺带扯一下这些东西复杂在哪里:

  1. 首先,由于监听套接字的文件描述符listenfd变得非阻塞,所以你要监听这个描述符上的可读事件,当然,这很简单.

  2. 如果连接到来了,新建连接,你要设置新来的连接的文件描述符的监听,非阻塞IO就是这么个事情,拉弓没有回头箭,一旦用了,几乎所有的套接字的描述符都要设置为非阻塞.这也不难.

  3. 一旦对方发送了数据,你要读取,像在epoll里面,为了提升效率,我们一般都会设置ET模式,这意味着你要一次性读完fd上的数据.不能说你想读多少就读多少,边读边处理,这样的话,你或许不得不用一个buf来缓存读到的数据,并且记录你已经处理了的数据的数目.

  4. 系统的tcp缓冲区很容易就填满了,因为你的IO是非阻塞的,所以一旦发生了缓冲区满这种事情,你不大可能等待到缓冲区可用,所以你也要监听这个套接字的可写事件,并且为了保存数据,你要自己弄一个buf,要记录下来已经写了多少,还有多少数据要写,下次写的时候可以从上次中断的地方开始.

  5. 如果epoll,非阻塞IO碰上了多线程,复杂度还会上升.因为我们还必须处理一些竞争条件.

非阻塞IO的核心思想是避免阻塞在readwrite或其他IO系统调用上,这样可以最大限度地复用thread-of-control,让一个线程能够服务于多个socket连接,IO线程只能阻塞在IO multiplexing函数上,如select/poll/epoll_wait.这样一来,应用层的缓冲是必需的,每个TCP socket都要有statefulinput bufferoutput buffer. -- linux多线程服务端编程

从前面的分析可以看得到,为了提高效率,我们真的牺牲了很多.要做的工作也增加了很多.当然,我可不想讲什么epoll,其实我更想谈的是非阻塞IOwrite, read这些函数里的一些表现.

write函数

首先是write函数,write函数如果返回的值是大于等于0的话,那表示的是已经写入的字节数目.返回-1的话,一般会设置errno,通过判别errno我们可以知道到底是因为什么出错.

read函数

read也没有什么好说的.

writev函数

好吧,我写这一节,其实只是想吐槽一下writev函数的,因为我被它坑到了.如果你用这个函数来处理非阻塞的文件描述符,应该会感觉这个玩意简直和鸡肋一毛一样.

man手册里说,它的行为和write差不多:

The writev() system call works just like write(2) except that multiple buffers are written out.

不过writev是分散写,也就是你的数据可以这里一块,那里一块,然后只要将这些数据的首地址,长度什么的写到一个iovc的结构体数组里,传递给writev,writev就帮你来写这些数据,在你看来,分散的数据就像是连成了一体.

对于阻塞IO,这个函数应该是很好用的,对于非阻塞IO,你如果想用的话,要做的工作估计还很多,我们假想一下,如果writev返回一个大于0的值num,这个值又小于所有要传递的文件块的总长度,这意味着什么,意味着数据还没有写完啊.如果你还想写的话,你下一次调用writev的时候要重新整理iovc数组,这坑爹呢.

首先,你要一块一块比对大小,确定已经写了多少块数据,然后对于那个写了一点的块,要将iovc[0].iov_base指向下一个开始的字节...,好吧,听起来就烦,好吧,你看到了,其实还不如直接用write呢.

我的代码

变化颇多的代码

为了使用IO多路复用机制,这次的代码在之前的代码上做了很多的修改,昨晚之后,为了使代码效率更高,我使用了Cache技术,具体而言,就是说,每次加载了文件,不是用完了马上就卸载掉,而是暂时保存起来,这样可以大大加快程序的处理速度.我并没有一上来就搞起多线程,如果单线程都跑出错的话,多线程就更不要谈了.

主函数

主函数是一个典型的epoll形式的写法,如果连接到来,立马处理连接,有数据可读立马去读,如果缓冲区可用,则立马去写.值得一提的是,这里使用的是epollLT模式,这个模式有一个特点就是,如果对方发送的数据你没有读完的话,它会一直触发,或者说,如果系统的TCP缓冲区可用了,而你没有理会的话,它也会一直触发.

  1. int main(int argc, char *argv[])
  2. {
  3. int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
  4. epoll_event events[MAXEVENTNUM];
  5. sockaddr clnaddr;
  6. socklen_t clnlen = sizeof(clnaddr);
  7. addsig(SIGPIPE, SIG_IGN);
  8. int epollfd = Epoll_create(80); /* 10基本上没有什么用处 */
  9. addfd(epollfd, listenfd, false); /* epollfd要监听listenfd上的可读事件 */
  10. HttpHandle handle[256];
  11. int acnt = 0;
  12. for ( ; ;) {
  13. int eventnum = Epoll_wait(epollfd, events, MAXEVENTNUM, -1);
  14. for (int i = 0; i < eventnum; ++i) {
  15. int sockfd = events[i].data.fd;
  16. if (sockfd == listenfd) { /* 有连接到来 */
  17. printf("%d\n", ++acnt);
  18. int connfd = Accept(listenfd, &clnaddr, &clnlen);
  19. handle[connfd].init(connfd); /* 初始化 */
  20. addfd(epollfd, connfd, false); /* 加入监听 */
  21. }
  22. else if (events[i].events & EPOLLIN) { /* 有数据可读 */
  23. int res = handle[sockfd].processRead(); /* 处理读事件 */
  24. if (res == STATUS_WRITE) /* 我们需要监听写事件 */
  25. modfd(epollfd, sockfd, EPOLLOUT);
  26. else
  27. removefd(epollfd, sockfd);
  28. }
  29. else if (events[i].events & EPOLLOUT) { /* 如果可写了 */
  30. printf("Could write!\n");
  31. int res = handle[sockfd].processWrite(); /* 处理写事件 */
  32. if (res == STATUS_READ) /* 对方发送了keepalive */
  33. modfd(epollfd, sockfd, EPOLLIN);
  34. else
  35. removefd(epollfd, sockfd);
  36. }
  37. }
  38. }
  39. return 0;
  40. }

将http处理代码封装起来

为了管理起来更加方便,我将http处理的代码封装到了一个HttpHandle的类之中.

我们一起来看一下这个类:

  1. class HttpHandle : public noncopyable /* 不可以被拷贝,不可以被复制 */
  2. {
  3. public:
  4. static const int READ_BUFFER_SIZE = 1024; /* 读缓冲区的大小 */
  5. static const int WRITE_BUFFER_SIZE = 1024; /* 写缓冲区的大小 */
  6. private:
  7. static Cache& cache_; /* 全局只需要一个cache_*/
  8. /* 该HTTP连接的socket和对方的socket地址 */
  9. int sockfd_;
  10. boost::shared_ptr<FileInfo> fileInfo_;
  11. /* 读缓冲区 */
  12. char readBuf_[READ_BUFFER_SIZE];
  13. /* 标志读缓冲区中已经读入的客户数据的最后一个字节的下一个位置 */
  14. int nRead_;
  15. /* 当前正在分析的字符在读缓冲区中的位置 */
  16. int nChecked_;
  17. bool keepAlive_; /* 是否保持连接 */
  18. bool sendFile_; /* 是否发送文件 */
  19. /* 写缓冲区 */
  20. char writeBuf_[WRITE_BUFFER_SIZE];
  21. /* 写缓冲区中待发送的字节数 */
  22. int nStored_;
  23. /* 已经写了多少字节 */
  24. int written_;
  25. }

正如我前面所说的,一旦使用非阻塞IO,代码的复杂度就上来了,我们必须考虑配备读写缓冲区.sockfd_用于记录与客户端连接的socket描述符.fileInfo_记录了要发送的文件的一些信息.
当然,我们还要记录读到哪里啦,写到哪里啦,所以就有了nRead_,nChecked_,nStored_,written_在这些变量.

好吧,我们来看一些代码:

  1. bool HttpHandle::read()
  2. {
  3. /* 我们尽量一次性将数据全部读尽 */
  4. nRead_ = 0; /* 首先要清零 */
  5. nChecked_ = 0;
  6. if (nRead_ >= READ_BUFFER_SIZE) {
  7. return false;
  8. }
  9. int byte_read = 0;
  10. while (true) {
  11. byte_read = recv(sockfd_, readBuf_ + nRead_, READ_BUFFER_SIZE - nRead_, 0);
  12. if (byte_read == -1) { /* 代表出错了 */
  13. break;
  14. }
  15. else if (byte_read == 0) { /* 对方已经关闭了连接 */
  16. return false;
  17. }
  18. nRead_ += byte_read; /* 已经读取的字节 */
  19. }
  20. return true;
  21. }

这是read函数,需要一次性将对方发来的数据读尽.当然,你也可以不读完,那么epoll机制仍然会提醒你可读.不过这样编码起来,代码会复杂很多.

  1. int HttpHandle::processRead()
  2. {
  3. int is_static;
  4. struct stat sbuf;
  5. char buf[MAXLINE], method[MAXLINE], uri[MAXLINE], version[MAXLINE];
  6. char filename[MAXLINE], cgiargs[MAXLINE];
  7. char line[MAXLINE];
  8. if (false == read()) { /* 对方已经关闭了连接 */
  9. return STATUS_CLOSE;
  10. }
  11. /* 接下来开始解析读入的数据 */
  12. getLine(line, MAXLINE); /* 读取一行数据 */
  13. /* 使用sscanf函数确实是一个非常棒的办法! */
  14. sscanf(line, "%s %s %s", method, uri, version);
  15. if (strcasecmp(method, "GET")) {
  16. clienterror(method, "501", "Not Implemented",
  17. "Tiny does not implement this method");
  18. goto end;
  19. }
  20. readRequestHdrs(); /* 处理剩余的请求头部 */
  21. /* Parse URI from GET request */
  22. is_static = parseUri(uri, filename, cgiargs);
  23. if (stat(filename, &sbuf) < 0) {
  24. clienterror(filename, "404", "Not found",
  25. "Tiny couldn't find this file"); /* 没有找到文件 */
  26. goto end;
  27. }
  28. if (is_static) { /* Serve static content */
  29. if (!(S_ISREG(sbuf.st_mode)) || !(S_IRUSR & sbuf.st_mode)) {
  30. clienterror(filename, "403", "Forbidden",
  31. "Tiny couldn't read the file"); /* 权限不够 */
  32. goto end;
  33. }
  34. serveStatic(filename, sbuf.st_size);
  35. }
  36. else { /* Serve dynamic content */
  37. clienterror(method, "501", "Not Implemented",
  38. "Tiny does not implement this method");
  39. goto end;
  40. }
  41. end:
  42. return processWrite();
  43. }

processRead函数和之前版本的doit基本上是类似的,这里就不再详细展开.
与之前最大的不同是,下面的这几个函数:

  1. void HttpHandle::clienterror(char *cause, char *errnum, char *shortmsg, char *longmsg)
  2. {
  3. char buf[MAXLINE], body[MAXBUF];
  4. /* Build the HTTP response body */
  5. sprintf(body, "<html><title>Tiny Error</title>");
  6. sprintf(body, "%s<body bgcolor=""ffffff"">\r\n", body);
  7. sprintf(body, "%s%s: %s\r\n", body, errnum, shortmsg);
  8. sprintf(body, "%s<p>%s: %s\r\n", body, longmsg, cause);
  9. sprintf(body, "%s<hr><em>The Tiny Web server</em>\r\n", body);
  10. /* Print the HTTP response */
  11. addResponse("HTTP/1.0 %s %s\r\n", errnum, shortmsg);
  12. addResponse("Content-type: text/html\r\n");
  13. addResponse("Content-length: %d\r\n\r\n", (int)strlen(body));
  14. addResponse("%s", body);
  15. }

我们继续来查看addResponse这个函数:

  1. bool HttpHandle::addResponse(const char* format, ...)
  2. {
  3. if (nStored_ >= WRITE_BUFFER_SIZE) {
  4. return false;
  5. }
  6. va_list arg_list;
  7. va_start(arg_list, format);
  8. int len = vsnprintf(writeBuf_ + nStored_, WRITE_BUFFER_SIZE - 1 - nStored_, format, arg_list); /* 将数据输入到writeBuf_中 */
  9. if (len >= (WRITE_BUFFER_SIZE - 1 - nStored_)) {
  10. return false;
  11. }
  12. nStored_ += len;
  13. va_end(arg_list);
  14. return true;
  15. }

我们写的时候并不是直接发送给对方,而是先写入到本地的写缓冲区里面,这也很好理解,因为我们使用了非阻塞IO,写的话,很有肯能一次性写不完,因为不会像以前的版本一样阻塞到写完为止,所以我们必须将数据保存下来,这次写不完,下次接着写.

在前面的processRead函数的最后调用了processWrite函数,它的实现如下:

  1. int HttpHandle::processWrite()
  2. {
  3. int res;
  4. /*-
  5. * 数据要作为两部分发送,第1步,要发送writeBuf_里面的数据.
  6. */
  7. int nRemain = strlen(writeBuf_) - written_; /* writeBuf_中还有多少字节要写 */
  8. if (nRemain > 0) {
  9. while (true) {
  10. nRemain = strlen(writeBuf_) - written_;
  11. res = write(sockfd_, writeBuf_ + written_, nRemain);
  12. if (res < 0) {
  13. if (errno == EAGAIN) { /* 资源暂时不可用 */
  14. return STATUS_WRITE;
  15. }
  16. return STATUS_ERROR;
  17. }
  18. written_ += res;
  19. if (written_ == strlen(writeBuf_))
  20. break;
  21. }
  22. }
  23. /*-
  24. * 第2步,要发送html网页数据.
  25. */
  26. if (sendFile_) {
  27. int bytesToSend = fileInfo_->size_ + strlen(writeBuf_); /* 总共需要发送的字节数目 */
  28. while (true) {
  29. int offset = written_ - strlen(writeBuf_);
  30. res = write(sockfd_, (char *)fileInfo_->addr_ + offset, fileInfo_->size_ - offset);
  31. if (res < 0) {
  32. if (errno == EAGAIN) { /* 资源暂时不可用 */
  33. return STATUS_WRITE;
  34. }
  35. return STATUS_ERROR;
  36. }
  37. written_ += res;
  38. if (written_ == bytesToSend)
  39. break;
  40. }
  41. }
  42. /* 数据发送完毕 */
  43. reset();
  44. if (keepAlive_) /* 如果需要保持连接的话 */
  45. return STATUS_READ;
  46. else
  47. return STATUS_SUCCESS;
  48. }

数据发送可不是一件容易的事情,我的实现是这样的,前面构造的头部信息保存在writeBuf_中,文件的信息保存在fileInfo_这个结构中,所以要发送两次,第一次发送头部信息,第二次再发送文件信息.

在发送的过程中要考虑各种情况,如果系统的TCP的缓冲区已经满了,不能接收我们的数据了,我们要退出这次发送.否则的话,就一直发送,等到全部数据发送完毕为止.

你可能会感到奇怪,为什么我要分两次发送,为什么不直接将文件的信息写入到writeBuf_之中?这是因为效率的考虑,我们要尽量消除不必要的拷贝,如果要实现高性能的服务器的话.我们可以想象得到,有一些页面会特别受到用户的追捧,访问量会特别大,如果我们每一次都将这个页面从服务器加载,发送完了之后再关闭,这样会浪费我们多少cpu资源,为了更快的访问速度,我们的这个web服务器设计了一个Cache机制,将每次发送的页面缓存起来,下一次要用到这个页面的时候,直接用就可以了,不用加载.这样服务器的效率就变得高得多了.

Cache机制

为了保存文件的信息,我们设计了一个FileInfo类.

  1. class FileInfo : noncopyable
  2. {
  3. public:
  4. FileInfo(std::string& fileName, int fileSize) {
  5. int srcfd = Open(fileName.c_str(), O_RDONLY, 0); /* 打开文件 */
  6. size_ = fileSize;
  7. addr_ = Mmap(0, fileSize, PROT_READ, MAP_PRIVATE, srcfd, 0);
  8. Close(srcfd); /* 关闭文件 */
  9. }
  10. ~FileInfo() {
  11. Munmap(addr_, size_); /* 解除映射 */
  12. }
  13. public:
  14. void *addr_; /* 地址信息 */
  15. int size_; /* 文件大小 */
  16. };

这个类非常简单,具体而言,就是在构建类的时候,自动映射文件,类析构的时候实现解除映射.

为了管理FileInfo,我们设计了一个Cache类.我们来看一看吧.

  1. class Cache : noncopyable
  2. {
  3. typedef std::map<std::string, boost::shared_ptr<FileInfo>>::iterator it;
  4. private:
  5. std::map<std::string, boost::shared_ptr<FileInfo>> cache_; /* 实现文件名称到地址的一个映射 */
  6. static const size_t MAX_CACHE_SIZE = 100; /* 最多缓存100个文件 */
  7. ...
  8. }

这个Cache类中其实就是记录了一个文件名到文件地址的映射关系.它有一个重要的查找函数getFileAddr:

  1. boost::shared_ptr<FileInfo> getFileAddr(std::string fileName, int fileSize) {
  2. if (cache_.end() != cache_.find(fileName)) { /* 如果在cache中找到了 */
  3. return cache_[fileName];
  4. }
  5. if (cache_.size() >= MAX_CACHE_SIZE) { /* 文件数目过多,需要删除一个元素 */
  6. cache_.erase(cache_.begin()); /* 直接移除掉最前一个元素 */
  7. }
  8. /* 没有找到的话,我们需要加载文件 */
  9. boost::shared_ptr<FileInfo> fileInfo(new FileInfo(fileName, fileSize));
  10. cache_[fileName] = fileInfo;
  11. return fileInfo;
  12. }

思想很简单,那就是如果找到了,立马返回这个地址,如果缓存的文件过多,那么要删除一个元素,我这里实现的是最简单的,直接删除最前的一个元素,你也可以实现更加高效的算法.

如果没有找到的话,要重新从磁盘上加载这个文件.我们使用shared_ptr来管理资源,这种类型的指针的好处在于,只要你持有这种指针,那么指针指向的对象便不会被析构掉,恰好符合我们的要求.

我们继续来看HttpHandle类的serveStatic函数:

  1. void HttpHandle::serveStatic(char *fileName, int fileSize)
  2. { /* 用于处理静态的网页 */
  3. int srcfd;
  4. char fileType[MAXLINE], buf[MAXBUF];
  5. /* 构造头部信息 */
  6. getFileType(fileName, fileType);
  7. sprintf(buf, "HTTP/1.0 200 OK\r\n");
  8. sprintf(buf, "%sServer: Tiny Web Server\r\n", buf);
  9. sprintf(buf, "%sContent-length: %d\r\n", buf, fileSize);
  10. sprintf(buf, "%sContent-type: %s\r\n\r\n", buf, fileType);
  11. addResponse(buf);
  12. fileInfo_ = cache_.getFileAddr(fileName, fileSize); /* 添加文件 */
  13. sendFile_ = true;
  14. }

这个函数在这个时候就应该变得很简单了,就是构造头部信息,获得文件信息而已.

状态机

非阻塞IO的引入使得代码变得复杂起来,我们不能用之前的阻塞IO的思想来解决现在的问题了,网页的处理其实已经变成了一个状态机了,首先是对方来了连接,我们处理这个连接的HttpHandle就要初始化,对方发来的数据,我们处理这个连接的HttpHandle立马转为读取状态,读取完成后立马转入发送状态,发送完成之后要清理资源.

下一次连接依旧如此.

缺陷

这个版本的web server并发度有所提高,但是还存在不少的缺陷,代码也不是很漂亮,接下来版本的迭代中,我们将逐步解决这个问题.

我来讲一讲缺陷在哪里:

  1. GET /

然后停顿了30秒,才发送接下来的:

  1. Http/1.1\r\n...

我上面的代码势必会出错,因为对于客户端发送的消息,我只读了一次,也就是说上面的read函数只会读到GET /信息,接下来的代码中就parse这些信息,发送回复.很显然,会出错.

类似的问题需要考量的东西还有很多,写一个完备的服务器可真不是一件容易的事情.

具体代码还是看这里吧!:https://github.com/lishuhuakai/Spweb

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注