@xuchongfeng
2017-12-03T12:49:09.000000Z
字数 6754
阅读 71
Linux Socket IO
网络服务是资源的提供,服务器的存储资源,计算资源等,高效的服务模型是为了更快的服务更多的服务客户请求;
为了能更快的提供服务,需要减少等待时间,对于串行提供服务的,等待时间可能很长;而对于IO密集型的服务请求,其瓶颈在网络IO,对于计算密集型的服务请求,瓶颈在CPU,如果此时由大量的进程或者线程提供服务,那么就会发生大量的上下文切换,浪费CPU,这种情况需要减少上下文的切换,也就要求在一个进程中需要管理多个服务请求;
计算机操作系统层面,分为用户态和内核态,用户可以通过系统API,软中断和硬件中断进行内核态。传输层的TCP/UDP协议是内核协议栈,意味着接收到的数据先缓存在内核缓冲区,用户程序再将数据从内核缓冲区拷贝到用户存储空间;
如图所示,根据处理数据方式的不同,引入同步/异步,阻塞/非阻塞的概念。同步是指操作不完成就不返回,该调用会阻塞;异步不会阻塞调用者,当IO条件满足时,系统通知调用者操作已经完成。阻塞IO有点不达目的,誓不罢休的意思,读操作,一定要等到数据到达才返回,写操作,一定要写完数据才返回,调用者一直阻塞,直到IO条件满足;非阻塞IO,当条件满足时,则执行操作,当条件不满足时,如读操作,但是没有数据到达,写操作,内核TCP缓冲区已满,则立即返回,再由用户进程轮训状态。

IO复用允许一个进程同时管理多个请求。如poll,select,epoll,kqueue函数,监视对应请求发生的事件:可读,可写,异常等。在存在大量短连接的业务场景下,能高效的提供服务。
之后的部分是使用Python编写的示例,主要是因为Python比较简单,容易理解模型。
服务器端
#!/usr/bin/python# -*- coding: utf8 -*-import socketimport datetimeaddress = ('localhost', 1234)def init():listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)listenfd.bind(address)listenfd.listen(1)return listenfddef logic(listenfd):print "start accept connection"while 1:conn, addr = listenfd.accept()while 1:data = conn.recv(1024)if not data:breakprint "recv data len is %d" % len(data)conn.sendall(data)def loop():listenfd = init()while 1:try:logic(listenfd)except socket.error, e:print eif __name__ == "__main__":print "listening: %s : %s" % (address)loop()
客户端
#!/usr/bin/python# -*- coding: utf8 -*-import socketimport timeimport sysaddress = ("localhost", 1234)#echo, send data to server and receive data from serverdef run(chunk_size):connfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)connfd.connect(address)content = open("/dev/zero", "r")chunk = content.read(chunk_size)left = chunk_sizeconnfd.sendall(chunk)left = chunk_sizewhile left > 0:data = connfd.recv(1024)left -= len(data)print "recv data len is %d" % len(data)#time.sleep(10)if __name__ == "__main__":if len(sys.argv) != 2:print "usage: python client.py <chunk_size>"else:run(int(sys.argv[1]))
#!/usr/bin/python# -*- coding: utf8 -*-# severimport socketimport datetimeimport threadingaddress = ('localhost', 1234)def init():listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)listenfd.bind(address)listenfd.listen(1)return listenfddef worker(connfd):#print "%s" % threading.current_thread().namedata = ""while 1:tmp_data = connfd.recv(1024)if not tmp_data:breakdata += tmp_dataconnfd.sendall(data)def logic(listenfd):print "start accept connection"thread_num = 0while 1:conn, addr = listenfd.accept()worker_thread = threading.Thread(target=worker, args=(conn, ), \name="thread-%d" % thread_num)worker_thread.start()worker_thread.join()thread_num += 1def loop():listenfd = init()while 1:try:logic(listenfd)except socket.error, e:print eif __name__ == "__main__":print "listening: %s : %s" % (address)loop()
#!/usr/bin/python# -*- coding: utf8 -*-import socketimport timeimport sysimport threadingimport datetimeaddress = ("localhost", 1234)#echo, send data to server and receive data from serverdef run(chunk_size):connfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)print connfdconnfd.connect(address)content = open("/dev/zero", "r")chunk = content.read(chunk_size)left = chunk_sizeconnfd.sendall(chunk)connfd.shutdown(socket.SHUT_WR)left = chunk_sizewhile left > 0:data = connfd.recv(1024)if len(data) == 0:breakleft -= len(data)print "%s: recv data len is %d" % (threading.current_thread().name, len(data))#time.sleep(10)if __name__ == "__main__":if len(sys.argv) != 3:print "usage: python client.py <thread_num> <chunk_size>"else:time_start = datetime.datetime.now()for i in range(int(sys.argv[1])):job = threading.Thread(target=run, \args=(int(sys.argv[2]),), \name=("thread-%d" % i))job.start()job.join()time_end = datetime.datetime.now()time_delta = time_end - time_startprint "start at: %s\nend at: %s\neclipse time: %s" % (time_start,time_end, time_delta)
#!/usr/bin/python# -*- coding: utf8 -*-# server, using pollimport timeimport socketimport selectimport Queueimport errnoADDRESS = ("localhost", 1234)def init():listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)listenfd.bind(ADDRESS)listenfd.listen(5)return listenfddef eventloop():listenfd = init()readset = [listenfd]writeset = []exceptset = []message_queue = {}while True:rset, wset, exset = select.select(readset, writeset, exceptset, 20)for fd in rset:# fd == listenfd, acceptif fd == listenfd:connfd, addr = listenfd.accept()print addrconnfd.setblocking(False)readset.append(connfd)message_queue[connfd] = Queue.Queue()# recv dataelse:data = ""try:while True:tmp_data = fd.recv(1024)if not tmp_data:breakdata += tmp_dataif len(data) == 0:readset.remove(fd)continueexcept socket.error, e:err = e.args[0]# signal interrupt or no data receiveif err == errno.EAGAIN or err == errno.EWOULDBLOCK:writeset.append(fd)# the peer disconnectedelse:if fd in readset:readset.remove(fd)print "sendall"message_queue[fd].put(data)for fd in wset:try:data = message_queue[fd].get_nowait()except Queue.Empty:# configure fd closed or temp no dataif fd not in readset:writeset.remove(fd)else:print len(data)fd.send(data)for fd in exset:readset.remove(fd)if fd in writeset:writeset.remove(fd)fd.close()del message_queue[fd]if __name__ == "__main__":print "listening %s:%s" % ADDRESSeventloop()
#!/usr/bin/python# -*- coding: utf8 -*-# server, using epollimport timeimport socketimport selectimport Queueimport errnoADDRESS = ("localhost", 1234)def init():listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)listenfd.bind(ADDRESS)listenfd.listen(5)return listenfddef eventloop():listenfd = init()listenfd.setblocking(False)epoll = select.epoll()epoll.register(listenfd.fileno(), select.EPOLLIN)message_queue = {}connections = {}while True:events = epoll.poll(1)for fd, event in events:if fd == listenfd.fileno():connfd, address = listenfd.accept()connfd.setblocking(False)epoll.register(connfd.fileno(), select.EPOLLIN)connections[connfd.fileno()] = connfdmessage_queue[connfd.fileno()] = Queue.Queue()elif event & select.EPOLLIN:data = ""try:while True:tmp_data = connections[fd].recv(1024)#print len(tmp_data)if len(tmp_data) == 0:break;data += tmp_dataif len(data) == 0:epoll.modify(fd, select.EPOLLOUT | select.EPOLLET)except socket.error, e:err = e.args[0]if err == errno.EAGAIN or err == errno.EWOULDBLOCK:epoll.modify(fd, select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)message_queue[fd].put(data)elif event & select.EPOLLOUT:try:data = message_queue[fd].get_nowait()except Queue.Empty:passelse:connections[fd].sendall(data)elif event & select.EPOLLHUP:epoll.unregister(fd)connections[fd].close()del connections[fd]epoll.unregister(listenfd.fileno())epoll.close()listenfd.close()if __name__ == "__main__":print "listening %s:%s" % ADDRESSeventloop()
使用线程池,使用非阻塞IO+IO复用+线程池的组合可以高效的提供服务,最近的Nginx1.7版本,使用线程池处理耗时的操作,对于IO循环,操作在一个进程中完成,当某个操作比较耗时时,就会影响整个服务的提供,使用线程池处理耗时的请求能解决这一问题;
协程,协程是比线程更小的单元,它不是系统级的调度单元,它的“作案现场”在用户态保存,减少了用户态和内核态的切换,同时使程序员可以用同步的思维写异步执行的程序。
协程示例,使用同步思维写消费者和生产者。
#!/usr/bin/python# -*- coding: utf8 -*-def producer(c):c.send(None)n = 0while n < 5:n = n + 1print "[producer] producting %s..." % nr = c.send(n)print "[producer] consumer return: %s" % rdef consumer():r = ""while True:n = yield rif not n:returnprint "[consumer] consuming %s..." % nr = 'consumed'if __name__ == "__main__":c = consumer()producer(c)