[关闭]
@coder-pig 2018-04-28T17:07:00.000000Z 字数 9069 阅读 3782

Python多线程与多进程

Python


一句话概括本文


引言


关于Python中的GIL锁

概念

全局解释器锁,用于同步线程的一种机制,使得任何时候仅有一个线程在执行
GIL 并不是Python的特性,只是在实现Python解析器(CPython)时引入的
一个概念。换句话说,Python完全可以不依赖于GIL。

Python解释器进程内多线程是以协作多任务方式执行的,当一个线程遇到
I/O操作时会释放GIL。而依赖CPU计算的线程则是执行代码量到一定的阀值
才会释放GIL。而在Python 3.2开始使用新的GIL,使用固定的超时时间来指示
当前线程放弃全局锁,就是:当前线程持有这个锁,且其他线程请求这个锁时,
当前线程就会再5毫秒后被强制释放掉该锁。

多线程在处理CPU密集型操作因为各种循环处理计数等,会很快达到阀值,
而多个线程来回切换是会消耗资源的,所以多线程的效率往往可能还比不上
单线程!而在多核CPU上效率会更低,因为多核环境下,持有锁的CPU释放锁后,
其他CPU上的线程都会进行竞争,但GIL可能马上又会被之前的CPU拿到拿到,
导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度
状态,从而造成线程颠簸(thrashing),导致效率更低。

问题:因为GIL锁的原因,对于CPU密集型操作,Python多线程就是鸡肋了?

答:是的!尽管多线程开销小,但却无法利用多核优势!可以使用
多进程来规避这个问题,Python提供了multiprocessing
这个跨平台的模块来帮助我们实现多进程代码的编写。
每个线程都有自己独立的GIL,因此不会出现进程间GIL
锁抢夺的问题,但是也增加程序实现线程间数据通讯和同步
是的成本,这个需要自行进行权衡。
另外还是得强调下,GIL只会影响到那些严重依赖CPU的程序,
对于网络交互这种涉及到IO的,使用多线程就很合适,写爬虫用到
多进程,一般是在部署分布式爬虫的时候才会用到。


threading模块


1.threaing模块提供的可直接调用函数

  • active_count():获取当前活跃(alive)线程的个数;
  • current_thread():获取当前的线程对象;
  • get_ident():返回当前线程的索引,一个非零的整数;(3.3新增)
  • enumerate():获取当前所有活跃线程的列表;
  • main_thread():返回主线程对象,(3.4新增);
  • settrace(func):设置一个回调函数,在run()执行之前被调用;
  • setprofile(func):设置一个回调函数,在run()执行完毕之后调用;
  • stack_size():返回创建新线程时使用的线程堆栈大小;
  • threading.TIMEOUT_MAX:堵塞线程时间最大值,超过这个值会栈溢出!

2.线程局部变量(Thread-Local Data)

threading.local()实例化一个全局对象,不同线程可以往里面保存数据,
互不干扰。实现原理是该对象内部用一个大字典,保存键值为两个
弱引用对象,{线程对象,字典对象},通过current_thread()获得当前
的线程对象,作为key以此拿到对应的字典对象

  1. # 线程局部变量使用示例
  2. import threading
  3. import random
  4. data = threading.local()
  5. def show(d):
  6. try:
  7. num = d.num
  8. except AttributeError:
  9. print("线程 %s 还未设置该属性!" % threading.current_thread().getName())
  10. else:
  11. print("线程 %s 中该属性的值为 = %s" % (threading.current_thread().getName(), num))
  12. def thread_call(d):
  13. show(d)
  14. d.num = random.randint(1, 100)
  15. show(d)
  16. if __name__ == '__main__':
  17. show(data)
  18. data.num = 666
  19. show(data)
  20. for i in range(2):
  21. t = threading.Thread(target=thread_call, args=(data,), name='Thread ' + str(i))
  22. t.start()

运行结果


3.线程对象(threading.Thread)

使用threading.Thread创建线程,可以通过下面两种方法创建新线程:

  • 1.直接创建threading.Thread对象,并把调用对象作为参数传入;
  • 2.继承threading.Thread类重写run() 方法;

Thread类构造函数

参数依次是

  • group:线程组
  • target:要执行的函数
  • name:线程名字
  • args/kwargs:要传入的函数的参数
  • daemon:是否为守护线程

相关的属性与函数

  • start():启动线程,只能调用一次;
  • run():线程执行的操作,可继承Thread重写,参数可从args和kwargs获取;
  • join([timeout]):堵塞调用线程,直到被调用线程运行结束或超时;如果
    没设置超时时间会一直堵塞到被调用线程结束。
  • name/getName():获得线程名;
  • setName():设置线程名;
  • ident:线程是已经启动,未启动会返回一个非零整数;
  • is_alive():判断是否在运行,启动后,终止前;
  • daemon/isDaemon():线程是否为守护线程;
  • setDaemon():设置线程为守护线程;

4.Lock(指令锁)与RLock(可重入锁)

threading模块中提供了两个类来确保多线程共享资源的访问:LockRLock
用法一样,后者和前者的区别是可重入,即:RLock可被同一个线程请求多次

锁分为两种状态(锁定与非锁定),两个常用函数:acquire()加锁,release()解锁,
acquire()函数有两个可选参数,blocking=True[是否堵塞当前当前线程等待],
timeout=None[堵塞等待时间],如果成功获得锁acquire返回True,否则返回False,
超时也是返回False。

锁使用起来很简单,在访问共享资源的地方acquire一下,用完release下就好。
要注意,acquire与release需要成对出现,有多少个acquire,就要有多少个
release,才能真正释放锁!锁处于unlocked状态,调用release函数是会抛
RuntimeError异常的

使用示例如下

  1. # Lock指令锁的使用示例
  2. import threading
  3. import time
  4. import config as c
  5. out_file_name = c.outputs_logs_path + 'lockTest.txt'
  6. lock = threading.Lock()
  7. class MyThread(threading.Thread):
  8. def __init__(self, string):
  9. super().__init__()
  10. self.string = string
  11. def run(self):
  12. write_to_file(self.name + '~' + self.string)
  13. time.sleep(1)
  14. def write_to_file(string):
  15. if lock.acquire():
  16. try:
  17. with open(out_file_name, "a+", encoding='utf-8') as f:
  18. f.write(string + '\n')
  19. except OSError as reason:
  20. print(str(reason))
  21. finally:
  22. lock.release()
  23. if __name__ == '__main__':
  24. for i in range(1, 100):
  25. t = MyThread(str(i)).start()

运行结果

写入文件结果不会出现这种乱序的情况:


5.条件变量(Condition)

用于处理复杂线程同步问题, 比如最经典的生产者与消费者问题
Condition除了提供与Lock类似的 acquire()release()函数外,
还提供了 wait()notify() 函数,使用流程如下:

  • 1.调用threading.Condition获得一个条件变量对象;
  • 2.线程调用acquire获得Condition对象
  • 3.条件判断,不满足条件调用wait函数;满足条件,进行一些处理改变条件后,
    调用notify函数通知处于wait 状态的线程,重新进行条件判断。

Condition维护着一个互斥锁对象(默认是RLock),也可以自己实例化一个
在Condition实例化的时候通过构造函数传入,so,调用的Condition的
acquire与release函数,其实调用就是这个锁对象的acquire与release函数。

除了这四个函数外还有其他函数,不过下述函数需要在acquire后才能
调用,否则会抛RuntimeError异常!!!

  • wait(timeout=None):释放锁,同时线程被挂起,直到收到通知被唤醒
    或超时(如果设置了timeout),当线程被唤醒并重新占有锁时,程序才继续执行;
  • wait_for(predicate, timeout=None):等待知道条件为True,predicate应该是
    一个回调函数,返回布尔值,timeout用于指定超时时间,返回值为回调函数
    返回的布尔值,或者超时,返回False(3.2新增);
  • notify(n=1):默认唤醒一个正在的等待线程,notify并不释放锁!!!
  • notify_all():唤醒所有等待线程,进入就绪状态,等待获得锁,notify_all 同样不释放锁!!!

使用示例如下

  1. # Condition条件变量使用示例(简单的生产者与消费者)
  2. import threading
  3. import time
  4. condition = threading.Condition()
  5. products = 0 # 商品数量
  6. # 定义生产者线程类
  7. class Producer(threading.Thread):
  8. def run(self):
  9. global products
  10. while True:
  11. if condition.acquire():
  12. if products >= 99:
  13. condition.wait()
  14. else:
  15. products += 2
  16. print(self.name + "生产了2个产品,当前剩余产品数为:" + str(products))
  17. condition.notify()
  18. condition.release()
  19. time.sleep(2)
  20. # 定义消费者线程类
  21. class Consumer(threading.Thread):
  22. def run(self):
  23. global products
  24. while True:
  25. if condition.acquire():
  26. if products < 3:
  27. condition.wait()
  28. else:
  29. products -= 3
  30. print(self.name + "消耗了3个产品,当前剩余产品数为:" + str(products))
  31. condition.notify()
  32. condition.release()
  33. time.sleep(2)
  34. if __name__ == '__main__':
  35. # 创建五个生产者线程
  36. for i in range(5):
  37. p = Producer()
  38. p.start()
  39. # 创建两个消费者线程
  40. for j in range(2):
  41. c = Consumer()
  42. c.start()

运行结果


6.信号量(Semaphore)

定义一个值,即允许多少个线程同时访问,超过堵塞等待,

使用示例如下

  1. # 信号量Semaphore的使用示例
  2. import threading
  3. import time
  4. import random
  5. s = threading.Semaphore(5) # 粪坑
  6. class Human(threading.Thread):
  7. def run(self):
  8. s.acquire() # 占坑
  9. print("拉屎拉屎 - " + self.name + " - " + str(time.ctime()))
  10. time.sleep(random.randrange(1, 3))
  11. print("拉完走人 - " + self.name + " - " + str(time.ctime()))
  12. s.release() # 走人
  13. if __name__ == '__main__':
  14. for i in range(10):
  15. human = Human()
  16. human.start()

运行结果


7.通用的条件变量(Event)

Python提供的用于线程间通信的信号标志,一个线程标识了一个事件,
其他线程处于等待状态,直到事件发生后,所有线程都会被激活。

Event对象实现了简单的线程通信机制,提供了设置信号,清除信号,
等待等用于线程间通信,有下述四个可供调用的方法:

  • is_set():判断内部标志是否为真
  • set():设置信号标志为真
  • clear():清除Event对象内部的信号标志(设置为false)
  • wait(timeout=None):使线程一直处于堵塞,知道标识符变为True

使用示例如下

  1. # 通用的条件变量Event 使用示例
  2. import threading
  3. import time
  4. import random
  5. class CarThread(threading.Thread):
  6. def __init__(self, event):
  7. threading.Thread.__init__(self)
  8. self.threadEvent = event
  9. def run(self):
  10. # 休眠模拟汽车先后到达路口时间
  11. time.sleep(random.randrange(1, 10))
  12. print("汽车 - " + self.name + " - 到达路口...")
  13. self.threadEvent.wait()
  14. print("汽车 - " + self.name + " - 通过路口...")
  15. if __name__ == '__main__':
  16. light_event = threading.Event()
  17. # 假设有20台车子
  18. for i in range(20):
  19. car = CarThread(event=light_event)
  20. car.start()
  21. while threading.active_count() > 1:
  22. light_event.clear()
  23. print("红灯等待...")
  24. time.sleep(3)
  25. print("绿灯通行...")
  26. light_event.set()
  27. time.sleep(2)

运行结果


8.定时器Timer

与Thread类似,只是要等待一段时间后才会开始运行,单位秒。

使用示例如下

  1. # 定时器Timer使用示例
  2. import threading
  3. import time
  4. def skill_ready():
  5. print("!!!!!!大招已经准备好了!!!!!!")
  6. if __name__ == '__main__':
  7. t = threading.Timer(5, skill_ready)
  8. t.start()
  9. while threading.active_count() > 1:
  10. print("======大招蓄力中======")
  11. time.sleep(1)

运行结果:


9.栅栏(Barrier)

多个线程间相互等待,调用了wait()方法的线程进入堵塞,
直到所有线程都调用了wait()方法,然后所有线程同时
进入就绪状态,等待调度运行。

构造函数Barrier(parties,action=None,timeout=None)

  • parties:创建一个可容纳parties条线程的栅栏;
  • action:全部线程被释放时可被其中一条线程调用的可调用对象;
  • timeout:线程调用wait()方法时没有显式设定timeout,就用的这个作为默认值;

相关函数

  • wait(timeout=None):表示线程就位,返回值是一个0到parties-1之间的整数,
    每条线程都不一样,这个值可以用作挑选一条线程做些清扫工作,另外如果你在
    构造函数里设置了action的话,其中一个线程在释放之前将会调用它。如果调用
    出错的话,会让栅栏进入broken状态,超时同样也会进入broken状态,如果栅栏
    在处于broke状态的时候调用reset函数,会抛出一个BrokenBarrierError异常。
  • reset():本方法将栅栏置为初始状态,即empty状态。所有已经在等待的线程
    都会接收到BrokenBarrierError异常,注意当有其他处于unknown状态的线程时,
    调用此方法将可能获取到额外的访问。因此如果一个栅栏进入了broken状态,
    最好是放弃他并新建一个栅栏,而不是调用reset方法。
  • abort():将栅栏置为broken状态。本方法将使所有正在等待或将要调用
    wait()方法的线程收到BrokenBarrierError异常。本方法的使用情景为,比如:
    有一条线程需要abort(),又不想给其他线程造成死锁的状态,或许设定
    timeout参数要比使用本方法更可靠。
  • parites:将要使用本 barrier 的线程的数量
  • n_waiting:正在等待本 barrier 的线程的数量
  • broken:栅栏是否为broken状态,返回一个布尔值

BrokenBarrierError:RuntimeError的子类,当栅栏被reset()或broken时引发;

使用示例如下

  1. # 栅栏Barrier使用示例
  2. import random
  3. import threading
  4. import time
  5. class Staff(threading.Thread):
  6. def __init__(self, barriers):
  7. threading.Thread.__init__(self)
  8. self.barriers = barriers
  9. def run(self):
  10. print("员工 【" + self.name + "】" + "出门")
  11. time.sleep(random.randrange(1, 10))
  12. print("员工 【" + self.name + "】" + "已签到")
  13. self.barriers.wait()
  14. def ready():
  15. print(threading.current_thread().name + ":人齐,出发,出发~~~")
  16. if __name__ == '__main__':
  17. print("要出去旅游啦,大家快集合~")
  18. b = threading.Barrier(10, action=ready, timeout=20)
  19. for i in range(10):
  20. staff = Staff(b)
  21. staff.start()

运行结果


queue模块

Python提供的一个线程安全多生产者多消费者队列自带锁
多线程并发数据交换必备。

1.内置三种类型的队列

构造函数一样,都是只有一个maxsize=0,用于设置队列的容量,
如果设置的maxsize小于1,则表示队列的长度无限长。

2.两个异常

3.相关函数

代码示例如下

  1. # 队列queue使用示例
  2. import threading
  3. import queue
  4. import time
  5. class Worker(threading.Thread):
  6. def __init__(self, t_name):
  7. threading.Thread.__init__(self, name=t_name)
  8. def run(self):
  9. global m_queue
  10. while not m_queue.empty():
  11. d = m_queue.get()
  12. print("处理任务%d" % d)
  13. time.sleep(2)
  14. m_queue.task_done()
  15. if __name__ == '__main__':
  16. m_queue = queue.Queue()
  17. threads = []
  18. data_list = [i for i in range(0, 100)]
  19. for data in data_list:
  20. m_queue.put(data)
  21. for i in range(0, len(data_list)):
  22. t = Worker(t_name='线程' + str(i))
  23. t.daemon = True
  24. t.start()
  25. threads.append(t)
  26. m_queue.join()
  27. for t in threads:
  28. t.join()
  29. print("所有任务完成")

运行结果:(每个线程休眠2s模拟做网络操作,100个任务多线程并发一会儿就完成了~)


添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注