[关闭]
@hainingwyx 2018-09-16T12:57:58.000000Z 字数 8266 阅读 1634

python 多进程 multiprocessing

Python


Process

构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。

实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度。
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程。

属性:
authkey
daemon:和线程的setDeamon功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束)。
exitcode(进程在运行时为None、如果为–N,表示被信号N结束)。
name:进程名字。
pid:进程号。

创建函数并将其作为多进程

  1. import multiprocessing
  2. import time
  3. def worker(interval):
  4. n = 5
  5. while n > 0:
  6. print("The time is {0}".format(time.ctime()))
  7. time.sleep(interval)
  8. n -= 1
  9. if __name__ == "__main__":
  10. p = multiprocessing.Process(target = worker, args = (3,))
  11. p.start()
  12. print "p.pid:", p.pid
  13. print "p.name:", p.name
  14. print "p.is_alive:", p.is_alive()

结果

  1. p.pid: 8736
  2. p.name: Process-1
  3. p.is_alive: True
  4. The time is Tue Apr 21 20:55:12 2015
  5. The time is Tue Apr 21 20:55:15 2015
  6. The time is Tue Apr 21 20:55:18 2015
  7. The time is Tue Apr 21 20:55:21 2015
  8. The time is Tue Apr 21 20:55:24 2015

将进程定义为类

  1. import multiprocessing
  2. import time
  3. class ClockProcess(multiprocessing.Process):
  4. def __init__(self, interval):
  5. multiprocessing.Process.__init__(self)
  6. self.interval = interval
  7. def run(self):
  8. n = 5
  9. while n > 0:
  10. print("the time is {0}".format(time.ctime()))
  11. time.sleep(self.interval)
  12. n -= 1
  13. if __name__ == '__main__':
  14. p = ClockProcess(3)
  15. p.start()

结果

  1. the time is Tue Apr 21 20:31:30 2015
  2. the time is Tue Apr 21 20:31:33 2015
  3. the time is Tue Apr 21 20:31:36 2015
  4. the time is Tue Apr 21 20:31:39 2015
  5. the time is Tue Apr 21 20:31:42 2015

daemon属性

子进程和父进程是相互独立的:不会因为父进程的结束,而导致子进程结束。
如果子进程设置daemon属性,父进程结束,子进程也会立刻结束。

Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

  1. import multiprocessing
  2. import sys
  3. def worker_with(lock, f):
  4. with lock:
  5. fs = open(f, 'a+')
  6. n = 10
  7. while n > 1:
  8. fs.write("Lockd acquired via with\n")
  9. n -= 1
  10. fs.close()
  11. def worker_no_with(lock, f):
  12. lock.acquire()
  13. try:
  14. fs = open(f, 'a+')
  15. n = 10
  16. while n > 1:
  17. fs.write("Lock acquired directly\n")
  18. n -= 1
  19. fs.close()
  20. finally:
  21. lock.release()
  22. if __name__ == "__main__":
  23. lock = multiprocessing.Lock()
  24. f = "file.txt"
  25. w = multiprocessing.Process(target = worker_with, args=(lock, f))
  26. nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
  27. w.start()
  28. nw.start()
  29. print "end"

输出结果

  1. Lockd acquired via with
  2. Lockd acquired via with
  3. Lockd acquired via with
  4. Lockd acquired via with
  5. Lockd acquired via with
  6. Lockd acquired via with
  7. Lockd acquired via with
  8. Lockd acquired via with
  9. Lockd acquired via with
  10. Lock acquired directly
  11. Lock acquired directly
  12. Lock acquired directly
  13. Lock acquired directly
  14. Lock acquired directly
  15. Lock acquired directly
  16. Lock acquired directly
  17. Lock acquired directly
  18. Lock acquired directly

Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

  1. import multiprocessing
  2. import time
  3. def worker(s, i):
  4. s.acquire()
  5. print(multiprocessing.current_process().name + "acquire");
  6. time.sleep(i)
  7. print(multiprocessing.current_process().name + "release\n");
  8. s.release()
  9. if __name__ == "__main__":
  10. s = multiprocessing.Semaphore(2)
  11. for i in range(5):
  12. p = multiprocessing.Process(target = worker, args=(s, i*2))
  13. p.start()

输出

  1. Process-1acquire
  2. Process-1release
  3. Process-2acquire
  4. Process-3acquire
  5. Process-2release
  6. Process-5acquire
  7. Process-3release
  8. Process-4acquire
  9. Process-5release
  10. Process-4release

Event

Event用来实现进程间同步通信。

  1. import multiprocessing
  2. import time
  3. def wait_for_event(e):
  4. print("wait_for_event: starting")
  5. e.wait()
  6. print("wairt_for_event: e.is_set()->" + str(e.is_set()))
  7. def wait_for_event_timeout(e, t):
  8. print("wait_for_event_timeout:starting")
  9. e.wait(t)
  10. print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
  11. if __name__ == "__main__":
  12. e = multiprocessing.Event()
  13. w1 = multiprocessing.Process(name = "block",
  14. target = wait_for_event,
  15. args = (e,))
  16. w2 = multiprocessing.Process(name = "non-block",
  17. target = wait_for_event_timeout,
  18. args = (e, 2))
  19. w1.start()
  20. w2.start()
  21. time.sleep(3)
  22. e.set()
  23. print("main: event is set")

结果:

  1. wait_for_event: starting
  2. wait_for_event_timeout:starting
  3. wait_for_event_timeout:e.is_set->False
  4. main: event is set
  5. wait_for_event: e.is_set()->True

Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

  1. import multiprocessing
  2. def writer_proc(q):
  3. try:
  4. q.put(1, block = False)
  5. except:
  6. pass
  7. def reader_proc(q):
  8. try:
  9. print q.get(block = False)
  10. except:
  11. pass
  12. if __name__ == "__main__":
  13. q = multiprocessing.Queue()
  14. writer = multiprocessing.Process(target=writer_proc, args=(q,))
  15. writer.start()
  16. reader = multiprocessing.Process(target=reader_proc, args=(q,))
  17. reader.start()
  18. reader.join()
  19. writer.join()

结果:

  1. 1

Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

  1. import multiprocessing
  2. import time
  3. def proc1(pipe):
  4. while True:
  5. for i in xrange(10000):
  6. print "send: %s" %(i)
  7. pipe.send(i)
  8. time.sleep(1)
  9. def proc2(pipe):
  10. while True:
  11. print "proc2 rev:", pipe.recv()
  12. time.sleep(1)
  13. def proc3(pipe):
  14. while True:
  15. print "PROC3 rev:", pipe.recv()
  16. time.sleep(1)
  17. if __name__ == "__main__":
  18. pipe = multiprocessing.Pipe()
  19. p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
  20. p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
  21. #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
  22. p1.start()
  23. p2.start()
  24. #p3.start()
  25. p1.join()
  26. p2.join()
  27. #p3.join()

Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

在共享资源时,只能使用Multiprocessing.Manager类,而不能使用Queue或者Array。

构造方法
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes:使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

实例方法
apply_async(func[, args[, kwds[, callback]]])它是非阻塞。
apply(func[, args[, kwds]])是阻塞的。
close()关闭pool,使其不在接受新的任务。
terminate() 关闭pool,结束工作进程,不在处理未完成的任务。
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

  1. #coding: utf-8
  2. import multiprocessing
  3. import time
  4. def func(msg):
  5. print "msg:", msg
  6. time.sleep(3)
  7. print "end"
  8. if __name__ == "__main__":
  9. pool = multiprocessing.Pool(processes = 3)
  10. for i in xrange(4):
  11. msg = "hello %d" %(i)
  12. pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
  13. print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
  14. pool.close()
  15. pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
  16. print "Sub-process(es) done."

结果

  1. mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
  2. msg: hello 1
  3. msg: hello 2
  4. end
  5. msg: hello 3
  6. end
  7. end
  8. end
  9. Sub-process(es) done.

注:apply_async(func[, args[, kwds[, callback]]])它是非阻塞,apply(func[, args[, kwds]])是阻塞的

  1. #coding: utf-8
  2. import multiprocessing
  3. import time
  4. def func(msg):
  5. print "msg:", msg
  6. time.sleep(3)
  7. print "end"
  8. if __name__ == "__main__":
  9. pool = multiprocessing.Pool(processes = 3)
  10. for i in xrange(4):
  11. msg = "hello %d" %(i)
  12. pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
  13. print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
  14. pool.close()
  15. pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
  16. print "Sub-process(es) done."

注:主进程阻塞,所以子进程池起不到作用,

  1. msg: hello 0
  2. end
  3. msg: hello 1
  4. end
  5. msg: hello 2
  6. end
  7. msg: hello 3
  8. end
  9. Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
  10. Sub-process(es) done.

不阻塞主进程的方法:

  1. import multiprocessing
  2. import time
  3. def func(msg):
  4. print "msg:", msg
  5. time.sleep(3)
  6. print "end"
  7. return "done" + msg
  8. if __name__ == "__main__":
  9. pool = multiprocessing.Pool(processes=4)
  10. result = []
  11. for i in xrange(3):
  12. msg = "hello %d" %(i)
  13. result.append(pool.apply_async(func, (msg, )))
  14. pool.close()
  15. pool.join()
  16. for res in result:
  17. print ":::", res.get()
  18. print "Sub-process(es) done."

结果:

  1. msg: hello 0
  2. msg: hello 1
  3. msg: hello 2
  4. end
  5. end
  6. end
  7. ::: donehello 0
  8. ::: donehello 1
  9. ::: donehello 2
  10. Sub-process(es) done.

Multiprocessing.dummy多线程

Multiprocessing.dummy用法与Multiprocessing用法基本相同,只不过是用来创建多线程。

注意

参考文献

http://www.cnblogs.com/kaituorensheng/p/4445418.html
http://python.jobbole.com/87760/?utm_source=blog.jobbole.com&utm_medium=relatedPosts

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注