@coder-pig
2018-04-28T17:07:00.000000Z
字数 9069
阅读 3844
Python
一句话概括本文:
引言:
概念:
全局解释器锁,用于同步线程的一种机制,使得任何时候仅有一个线程在执行。
GIL 并不是Python的特性,只是在实现Python解析器(CPython)时引入的
一个概念。换句话说,Python完全可以不依赖于GIL。
Python解释器进程内的多线程是以协作多任务方式执行的,当一个线程遇到
I/O操作时会释放GIL。而依赖CPU计算的线程则是执行代码量到一定的阀值,
才会释放GIL。而在Python 3.2开始使用新的GIL,使用固定的超时时间来指示
当前线程放弃全局锁,就是:当前线程持有这个锁,且其他线程请求这个锁时,
当前线程就会再5毫秒后被强制释放掉该锁。
多线程在处理CPU密集型操作因为各种循环处理计数等,会很快达到阀值,
而多个线程来回切换是会消耗资源的,所以多线程的效率往往可能还比不上
单线程!而在多核CPU上效率会更低,因为多核环境下,持有锁的CPU释放锁后,
其他CPU上的线程都会进行竞争,但GIL可能马上又会被之前的CPU拿到拿到,
导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度
状态,从而造成线程颠簸(thrashing),导致效率更低。
问题:因为GIL锁的原因,对于CPU密集型操作,Python多线程就是鸡肋了?
答:是的!尽管多线程开销小,但却无法利用多核优势!可以使用
多进程来规避这个问题,Python提供了multiprocessing
这个跨平台的模块来帮助我们实现多进程代码的编写。
每个线程都有自己独立的GIL,因此不会出现进程间GIL
锁抢夺的问题,但是也增加程序实现线程间数据通讯和同步
是的成本,这个需要自行进行权衡。
另外还是得强调下,GIL只会影响到那些严重依赖CPU的程序,
对于网络交互这种涉及到IO的,使用多线程就很合适,写爬虫用到
多进程,一般是在部署分布式爬虫的时候才会用到。
- active_count():获取当前活跃(alive)线程的个数;
- current_thread():获取当前的线程对象;
- get_ident():返回当前线程的索引,一个非零的整数;(3.3新增)
- enumerate():获取当前所有活跃线程的列表;
- main_thread():返回主线程对象,(3.4新增);
- settrace(func):设置一个回调函数,在run()执行之前被调用;
- setprofile(func):设置一个回调函数,在run()执行完毕之后调用;
- stack_size():返回创建新线程时使用的线程堆栈大小;
- threading.TIMEOUT_MAX:堵塞线程时间最大值,超过这个值会栈溢出!
threading.local()实例化一个全局对象,不同线程可以往里面保存数据,
互不干扰。实现原理是该对象内部用一个大字典,保存键值为两个
弱引用对象,{线程对象,字典对象},通过current_thread()获得当前
的线程对象,作为key以此拿到对应的字典对象。
# 线程局部变量使用示例
import threading
import random
data = threading.local()
def show(d):
try:
num = d.num
except AttributeError:
print("线程 %s 还未设置该属性!" % threading.current_thread().getName())
else:
print("线程 %s 中该属性的值为 = %s" % (threading.current_thread().getName(), num))
def thread_call(d):
show(d)
d.num = random.randint(1, 100)
show(d)
if __name__ == '__main__':
show(data)
data.num = 666
show(data)
for i in range(2):
t = threading.Thread(target=thread_call, args=(data,), name='Thread ' + str(i))
t.start()
运行结果:
使用threading.Thread创建线程,可以通过下面两种方法创建新线程:
- 1.直接创建threading.Thread对象,并把调用对象作为参数传入;
- 2.继承threading.Thread类,重写run() 方法;
Thread类构造函数
参数依次是:
- group:线程组
- target:要执行的函数
- name:线程名字
- args/kwargs:要传入的函数的参数
- daemon:是否为守护线程
相关的属性与函数:
- start():启动线程,只能调用一次;
- run():线程执行的操作,可继承Thread重写,参数可从args和kwargs获取;
- join([timeout]):堵塞调用线程,直到被调用线程运行结束或超时;如果
没设置超时时间会一直堵塞到被调用线程结束。- name/getName():获得线程名;
- setName():设置线程名;
- ident:线程是已经启动,未启动会返回一个非零整数;
- is_alive():判断是否在运行,启动后,终止前;
- daemon/isDaemon():线程是否为守护线程;
- setDaemon():设置线程为守护线程;
threading模块中提供了两个类来确保多线程共享资源的访问:Lock 和 RLock
用法一样,后者和前者的区别是可重入,即:RLock可被同一个线程请求多次。
锁分为两种状态(锁定与非锁定),两个常用函数:acquire()加锁,release()解锁,
acquire()函数有两个可选参数,blocking=True[是否堵塞当前当前线程等待],
timeout=None[堵塞等待时间],如果成功获得锁acquire返回True,否则返回False,
超时也是返回False。
锁使用起来很简单,在访问共享资源的地方acquire一下,用完release下就好。
要注意,acquire与release需要成对出现,有多少个acquire,就要有多少个
release,才能真正释放锁!锁处于unlocked状态,调用release函数是会抛
RuntimeError异常的。
使用示例如下:
# Lock指令锁的使用示例
import threading
import time
import config as c
out_file_name = c.outputs_logs_path + 'lockTest.txt'
lock = threading.Lock()
class MyThread(threading.Thread):
def __init__(self, string):
super().__init__()
self.string = string
def run(self):
write_to_file(self.name + '~' + self.string)
time.sleep(1)
def write_to_file(string):
if lock.acquire():
try:
with open(out_file_name, "a+", encoding='utf-8') as f:
f.write(string + '\n')
except OSError as reason:
print(str(reason))
finally:
lock.release()
if __name__ == '__main__':
for i in range(1, 100):
t = MyThread(str(i)).start()
运行结果
写入文件结果不会出现这种乱序的情况:
用于处理复杂线程同步问题, 比如最经典的生产者与消费者问题
Condition除了提供与Lock类似的 acquire()
与 release()
函数外,
还提供了 wait()
与 notify()
函数,使用流程如下:
- 1.调用threading.Condition获得一个条件变量对象;
- 2.线程调用acquire获得Condition对象
- 3.条件判断,不满足条件调用wait函数;满足条件,进行一些处理改变条件后,
调用notify函数通知处于wait 状态的线程,重新进行条件判断。
Condition维护着一个互斥锁对象(默认是RLock),也可以自己实例化一个
在Condition实例化的时候通过构造函数传入,so,调用的Condition的
acquire与release函数,其实调用就是这个锁对象的acquire与release函数。
除了这四个函数外还有其他函数,不过下述函数需要在acquire后才能
调用,否则会抛RuntimeError异常!!!
- wait(timeout=None):释放锁,同时线程被挂起,直到收到通知被唤醒
或超时(如果设置了timeout),当线程被唤醒并重新占有锁时,程序才继续执行;- wait_for(predicate, timeout=None):等待知道条件为True,predicate应该是
一个回调函数,返回布尔值,timeout用于指定超时时间,返回值为回调函数
返回的布尔值,或者超时,返回False(3.2新增);- notify(n=1):默认唤醒一个正在的等待线程,notify并不释放锁!!!
- notify_all():唤醒所有等待线程,进入就绪状态,等待获得锁,notify_all 同样不释放锁!!!
使用示例如下:
# Condition条件变量使用示例(简单的生产者与消费者)
import threading
import time
condition = threading.Condition()
products = 0 # 商品数量
# 定义生产者线程类
class Producer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products >= 99:
condition.wait()
else:
products += 2
print(self.name + "生产了2个产品,当前剩余产品数为:" + str(products))
condition.notify()
condition.release()
time.sleep(2)
# 定义消费者线程类
class Consumer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products < 3:
condition.wait()
else:
products -= 3
print(self.name + "消耗了3个产品,当前剩余产品数为:" + str(products))
condition.notify()
condition.release()
time.sleep(2)
if __name__ == '__main__':
# 创建五个生产者线程
for i in range(5):
p = Producer()
p.start()
# 创建两个消费者线程
for j in range(2):
c = Consumer()
c.start()
运行结果:
定义一个值,即允许多少个线程同时访问,超过堵塞等待,
使用示例如下:
# 信号量Semaphore的使用示例
import threading
import time
import random
s = threading.Semaphore(5) # 粪坑
class Human(threading.Thread):
def run(self):
s.acquire() # 占坑
print("拉屎拉屎 - " + self.name + " - " + str(time.ctime()))
time.sleep(random.randrange(1, 3))
print("拉完走人 - " + self.name + " - " + str(time.ctime()))
s.release() # 走人
if __name__ == '__main__':
for i in range(10):
human = Human()
human.start()
运行结果:
Python提供的用于线程间通信的信号标志,一个线程标识了一个事件,
其他线程处于等待状态,直到事件发生后,所有线程都会被激活。
Event对象实现了简单的线程通信机制,提供了设置信号,清除信号,
等待等用于线程间通信,有下述四个可供调用的方法:
- is_set():判断内部标志是否为真
- set():设置信号标志为真
- clear():清除Event对象内部的信号标志(设置为false)
- wait(timeout=None):使线程一直处于堵塞,知道标识符变为True
使用示例如下:
# 通用的条件变量Event 使用示例
import threading
import time
import random
class CarThread(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.threadEvent = event
def run(self):
# 休眠模拟汽车先后到达路口时间
time.sleep(random.randrange(1, 10))
print("汽车 - " + self.name + " - 到达路口...")
self.threadEvent.wait()
print("汽车 - " + self.name + " - 通过路口...")
if __name__ == '__main__':
light_event = threading.Event()
# 假设有20台车子
for i in range(20):
car = CarThread(event=light_event)
car.start()
while threading.active_count() > 1:
light_event.clear()
print("红灯等待...")
time.sleep(3)
print("绿灯通行...")
light_event.set()
time.sleep(2)
运行结果:
与Thread类似,只是要等待一段时间后才会开始运行,单位秒。
使用示例如下:
# 定时器Timer使用示例
import threading
import time
def skill_ready():
print("!!!!!!大招已经准备好了!!!!!!")
if __name__ == '__main__':
t = threading.Timer(5, skill_ready)
t.start()
while threading.active_count() > 1:
print("======大招蓄力中======")
time.sleep(1)
运行结果:
多个线程间相互等待,调用了wait()方法的线程进入堵塞,
直到所有线程都调用了wait()方法,然后所有线程同时
进入就绪状态,等待调度运行。
构造函数:Barrier(parties,action=None,timeout=None)
- parties:创建一个可容纳parties条线程的栅栏;
- action:全部线程被释放时可被其中一条线程调用的可调用对象;
- timeout:线程调用wait()方法时没有显式设定timeout,就用的这个作为默认值;
相关函数:
- wait(timeout=None):表示线程就位,返回值是一个0到parties-1之间的整数,
每条线程都不一样,这个值可以用作挑选一条线程做些清扫工作,另外如果你在
构造函数里设置了action的话,其中一个线程在释放之前将会调用它。如果调用
出错的话,会让栅栏进入broken状态,超时同样也会进入broken状态,如果栅栏
在处于broke状态的时候调用reset函数,会抛出一个BrokenBarrierError异常。- reset():本方法将栅栏置为初始状态,即empty状态。所有已经在等待的线程
都会接收到BrokenBarrierError异常,注意当有其他处于unknown状态的线程时,
调用此方法将可能获取到额外的访问。因此如果一个栅栏进入了broken状态,
最好是放弃他并新建一个栅栏,而不是调用reset方法。- abort():将栅栏置为broken状态。本方法将使所有正在等待或将要调用
wait()方法的线程收到BrokenBarrierError异常。本方法的使用情景为,比如:
有一条线程需要abort(),又不想给其他线程造成死锁的状态,或许设定
timeout参数要比使用本方法更可靠。- parites:将要使用本 barrier 的线程的数量
- n_waiting:正在等待本 barrier 的线程的数量
- broken:栅栏是否为broken状态,返回一个布尔值
BrokenBarrierError:RuntimeError的子类,当栅栏被reset()或broken时引发;
使用示例如下:
# 栅栏Barrier使用示例
import random
import threading
import time
class Staff(threading.Thread):
def __init__(self, barriers):
threading.Thread.__init__(self)
self.barriers = barriers
def run(self):
print("员工 【" + self.name + "】" + "出门")
time.sleep(random.randrange(1, 10))
print("员工 【" + self.name + "】" + "已签到")
self.barriers.wait()
def ready():
print(threading.current_thread().name + ":人齐,出发,出发~~~")
if __name__ == '__main__':
print("要出去旅游啦,大家快集合~")
b = threading.Barrier(10, action=ready, timeout=20)
for i in range(10):
staff = Staff(b)
staff.start()
运行结果:
Python提供的一个线程安全的多生产者,多消费者队列,自带锁,
多线程并发数据交换必备。
1.内置三种类型的队列
Queue
:FIFO(先进先出);LifoQueue
:LIFO(后进先出);PriorityQueue
:优先级最小的先出;构造函数一样,都是只有一个maxsize=0,用于设置队列的容量,
如果设置的maxsize小于1,则表示队列的长度无限长。
2.两个异常:
3.相关函数
代码示例如下:
# 队列queue使用示例
import threading
import queue
import time
class Worker(threading.Thread):
def __init__(self, t_name):
threading.Thread.__init__(self, name=t_name)
def run(self):
global m_queue
while not m_queue.empty():
d = m_queue.get()
print("处理任务%d" % d)
time.sleep(2)
m_queue.task_done()
if __name__ == '__main__':
m_queue = queue.Queue()
threads = []
data_list = [i for i in range(0, 100)]
for data in data_list:
m_queue.put(data)
for i in range(0, len(data_list)):
t = Worker(t_name='线程' + str(i))
t.daemon = True
t.start()
threads.append(t)
m_queue.join()
for t in threads:
t.join()
print("所有任务完成")
运行结果:(每个线程休眠2s模拟做网络操作,100个任务多线程并发一会儿就完成了~)