[关闭]
@xuchongfeng 2017-12-03T12:49:09.000000Z 字数 6754 阅读 71

服务器服务模型总结

Linux Socket IO


基本概念

网络服务是资源的提供,服务器的存储资源,计算资源等,高效的服务模型是为了更快的服务更多的服务客户请求;


为了能更快的提供服务,需要减少等待时间,对于串行提供服务的,等待时间可能很长;而对于IO密集型的服务请求,其瓶颈在网络IO,对于计算密集型的服务请求,瓶颈在CPU,如果此时由大量的进程或者线程提供服务,那么就会发生大量的上下文切换,浪费CPU,这种情况需要减少上下文的切换,也就要求在一个进程中需要管理多个服务请求;


计算机操作系统层面,分为用户态和内核态,用户可以通过系统API,软中断和硬件中断进行内核态。传输层的TCP/UDP协议是内核协议栈,意味着接收到的数据先缓存在内核缓冲区,用户程序再将数据从内核缓冲区拷贝到用户存储空间;


如图所示,根据处理数据方式的不同,引入同步/异步,阻塞/非阻塞的概念。同步是指操作不完成就不返回,该调用会阻塞;异步不会阻塞调用者,当IO条件满足时,系统通知调用者操作已经完成。阻塞IO有点不达目的,誓不罢休的意思,读操作,一定要等到数据到达才返回,写操作,一定要写完数据才返回,调用者一直阻塞,直到IO条件满足;非阻塞IO,当条件满足时,则执行操作,当条件不满足时,如读操作,但是没有数据到达,写操作,内核TCP缓冲区已满,则立即返回,再由用户进程轮训状态。

此处输入图片的描述


IO复用允许一个进程同时管理多个请求。如poll,select,epoll,kqueue函数,监视对应请求发生的事件:可读,可写,异常等。在存在大量短连接的业务场景下,能高效的提供服务。

之后的部分是使用Python编写的示例,主要是因为Python比较简单,容易理解模型。

单进程(线程)+阻塞IO

服务器端

  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. import socket
  4. import datetime
  5. address = ('localhost', 1234)
  6. def init():
  7. listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  8. listenfd.bind(address)
  9. listenfd.listen(1)
  10. return listenfd
  11. def logic(listenfd):
  12. print "start accept connection"
  13. while 1:
  14. conn, addr = listenfd.accept()
  15. while 1:
  16. data = conn.recv(1024)
  17. if not data:
  18. break
  19. print "recv data len is %d" % len(data)
  20. conn.sendall(data)
  21. def loop():
  22. listenfd = init()
  23. while 1:
  24. try:
  25. logic(listenfd)
  26. except socket.error, e:
  27. print e
  28. if __name__ == "__main__":
  29. print "listening: %s : %s" % (address)
  30. loop()

客户端

  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. import socket
  4. import time
  5. import sys
  6. address = ("localhost", 1234)
  7. #echo, send data to server and receive data from server
  8. def run(chunk_size):
  9. connfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  10. connfd.connect(address)
  11. content = open("/dev/zero", "r")
  12. chunk = content.read(chunk_size)
  13. left = chunk_size
  14. connfd.sendall(chunk)
  15. left = chunk_size
  16. while left > 0:
  17. data = connfd.recv(1024)
  18. left -= len(data)
  19. print "recv data len is %d" % len(data)
  20. #time.sleep(10)
  21. if __name__ == "__main__":
  22. if len(sys.argv) != 2:
  23. print "usage: python client.py <chunk_size>"
  24. else:
  25. run(int(sys.argv[1]))

多线程+阻塞IO

  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. # sever
  4. import socket
  5. import datetime
  6. import threading
  7. address = ('localhost', 1234)
  8. def init():
  9. listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  10. listenfd.bind(address)
  11. listenfd.listen(1)
  12. return listenfd
  13. def worker(connfd):
  14. #print "%s" % threading.current_thread().name
  15. data = ""
  16. while 1:
  17. tmp_data = connfd.recv(1024)
  18. if not tmp_data:
  19. break
  20. data += tmp_data
  21. connfd.sendall(data)
  22. def logic(listenfd):
  23. print "start accept connection"
  24. thread_num = 0
  25. while 1:
  26. conn, addr = listenfd.accept()
  27. worker_thread = threading.Thread(target=worker, args=(conn, ), \
  28. name="thread-%d" % thread_num)
  29. worker_thread.start()
  30. worker_thread.join()
  31. thread_num += 1
  32. def loop():
  33. listenfd = init()
  34. while 1:
  35. try:
  36. logic(listenfd)
  37. except socket.error, e:
  38. print e
  39. if __name__ == "__main__":
  40. print "listening: %s : %s" % (address)
  41. loop()
  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. import socket
  4. import time
  5. import sys
  6. import threading
  7. import datetime
  8. address = ("localhost", 1234)
  9. #echo, send data to server and receive data from server
  10. def run(chunk_size):
  11. connfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  12. print connfd
  13. connfd.connect(address)
  14. content = open("/dev/zero", "r")
  15. chunk = content.read(chunk_size)
  16. left = chunk_size
  17. connfd.sendall(chunk)
  18. connfd.shutdown(socket.SHUT_WR)
  19. left = chunk_size
  20. while left > 0:
  21. data = connfd.recv(1024)
  22. if len(data) == 0:
  23. break
  24. left -= len(data)
  25. print "%s: recv data len is %d" % (threading.current_thread().name, len(data))
  26. #time.sleep(10)
  27. if __name__ == "__main__":
  28. if len(sys.argv) != 3:
  29. print "usage: python client.py <thread_num> <chunk_size>"
  30. else:
  31. time_start = datetime.datetime.now()
  32. for i in range(int(sys.argv[1])):
  33. job = threading.Thread(target=run, \
  34. args=(int(sys.argv[2]),), \
  35. name=("thread-%d" % i)
  36. )
  37. job.start()
  38. job.join()
  39. time_end = datetime.datetime.now()
  40. time_delta = time_end - time_start
  41. print "start at: %s\nend at: %s\neclipse time: %s" % (time_start,
  42. time_end, time_delta)

IO复用+非阻塞IO

  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. # server, using poll
  4. import time
  5. import socket
  6. import select
  7. import Queue
  8. import errno
  9. ADDRESS = ("localhost", 1234)
  10. def init():
  11. listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  12. listenfd.bind(ADDRESS)
  13. listenfd.listen(5)
  14. return listenfd
  15. def eventloop():
  16. listenfd = init()
  17. readset = [listenfd]
  18. writeset = []
  19. exceptset = []
  20. message_queue = {}
  21. while True:
  22. rset, wset, exset = select.select(readset, writeset, exceptset, 20)
  23. for fd in rset:
  24. # fd == listenfd, accept
  25. if fd == listenfd:
  26. connfd, addr = listenfd.accept()
  27. print addr
  28. connfd.setblocking(False)
  29. readset.append(connfd)
  30. message_queue[connfd] = Queue.Queue()
  31. # recv data
  32. else:
  33. data = ""
  34. try:
  35. while True:
  36. tmp_data = fd.recv(1024)
  37. if not tmp_data:
  38. break
  39. data += tmp_data
  40. if len(data) == 0:
  41. readset.remove(fd)
  42. continue
  43. except socket.error, e:
  44. err = e.args[0]
  45. # signal interrupt or no data receive
  46. if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
  47. writeset.append(fd)
  48. # the peer disconnected
  49. else:
  50. if fd in readset:
  51. readset.remove(fd)
  52. print "sendall"
  53. message_queue[fd].put(data)
  54. for fd in wset:
  55. try:
  56. data = message_queue[fd].get_nowait()
  57. except Queue.Empty:
  58. # configure fd closed or temp no data
  59. if fd not in readset:
  60. writeset.remove(fd)
  61. else:
  62. print len(data)
  63. fd.send(data)
  64. for fd in exset:
  65. readset.remove(fd)
  66. if fd in writeset:
  67. writeset.remove(fd)
  68. fd.close()
  69. del message_queue[fd]
  70. if __name__ == "__main__":
  71. print "listening %s:%s" % ADDRESS
  72. eventloop()
  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. # server, using epoll
  4. import time
  5. import socket
  6. import select
  7. import Queue
  8. import errno
  9. ADDRESS = ("localhost", 1234)
  10. def init():
  11. listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  12. listenfd.bind(ADDRESS)
  13. listenfd.listen(5)
  14. return listenfd
  15. def eventloop():
  16. listenfd = init()
  17. listenfd.setblocking(False)
  18. epoll = select.epoll()
  19. epoll.register(listenfd.fileno(), select.EPOLLIN)
  20. message_queue = {}
  21. connections = {}
  22. while True:
  23. events = epoll.poll(1)
  24. for fd, event in events:
  25. if fd == listenfd.fileno():
  26. connfd, address = listenfd.accept()
  27. connfd.setblocking(False)
  28. epoll.register(connfd.fileno(), select.EPOLLIN)
  29. connections[connfd.fileno()] = connfd
  30. message_queue[connfd.fileno()] = Queue.Queue()
  31. elif event & select.EPOLLIN:
  32. data = ""
  33. try:
  34. while True:
  35. tmp_data = connections[fd].recv(1024)
  36. #print len(tmp_data)
  37. if len(tmp_data) == 0:
  38. break;
  39. data += tmp_data
  40. if len(data) == 0:
  41. epoll.modify(fd, select.EPOLLOUT | select.EPOLLET)
  42. except socket.error, e:
  43. err = e.args[0]
  44. if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
  45. epoll.modify(fd, select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
  46. message_queue[fd].put(data)
  47. elif event & select.EPOLLOUT:
  48. try:
  49. data = message_queue[fd].get_nowait()
  50. except Queue.Empty:
  51. pass
  52. else:
  53. connections[fd].sendall(data)
  54. elif event & select.EPOLLHUP:
  55. epoll.unregister(fd)
  56. connections[fd].close()
  57. del connections[fd]
  58. epoll.unregister(listenfd.fileno())
  59. epoll.close()
  60. listenfd.close()
  61. if __name__ == "__main__":
  62. print "listening %s:%s" % ADDRESS
  63. eventloop()

总结

使用线程池,使用非阻塞IO+IO复用+线程池的组合可以高效的提供服务,最近的Nginx1.7版本,使用线程池处理耗时的操作,对于IO循环,操作在一个进程中完成,当某个操作比较耗时时,就会影响整个服务的提供,使用线程池处理耗时的请求能解决这一问题;


协程,协程是比线程更小的单元,它不是系统级的调度单元,它的“作案现场”在用户态保存,减少了用户态和内核态的切换,同时使程序员可以用同步的思维写异步执行的程序。

协程示例,使用同步思维写消费者和生产者。

  1. #!/usr/bin/python
  2. # -*- coding: utf8 -*-
  3. def producer(c):
  4. c.send(None)
  5. n = 0
  6. while n < 5:
  7. n = n + 1
  8. print "[producer] producting %s..." % n
  9. r = c.send(n)
  10. print "[producer] consumer return: %s" % r
  11. def consumer():
  12. r = ""
  13. while True:
  14. n = yield r
  15. if not n:
  16. return
  17. print "[consumer] consuming %s..." % n
  18. r = 'consumed'
  19. if __name__ == "__main__":
  20. c = consumer()
  21. producer(c)
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注