[关闭]
@coder-pig 2018-04-02T11:52:36.000000Z 字数 6121 阅读 1497

小猪的Python学习之旅 —— 11.Python并发之threading模块(2)

Python


一句话概括本文

本节继续把Python里threading线程模块剩下的ConditionSemaphore
EventTimerBarrier讲解完毕,文档是枯燥无味的,希望通过简单
有趣的例子,可以帮你快速掌握这几个东东的用法~

啃文档是比较乏味的,先来个小姐姐提提神吧~

别问高清原图,程序猿自己动手,丰(营)衣(养)足(跟)食(不上),
脚本自取:https://github.com/coder-pig/ReptileSomething


引言

如果你忘记了threading上一部分内容,可以移步至:
小猪的Python学习之旅 —— 7.Python并发之threading模块(1)
温故知新,官方文档依旧是:
https://docs.python.org/3/library/threading.html


1.条件变量(Condition)

上节学习了Python为我们提供的第一个用于线程同步的东东——互斥锁
又分Lock(指令锁)RLock(可重入锁),但是互斥锁只是最简单的同步机制,
Python为我们提供了Condition(条件变量),以便于处理复杂线程同步问题,
比如最经典的生产者与消费者问题。

Condition除了提供与Lock类似的acquire()release()函数外,还提供了
wait()notify()函数。用法如下:

写个简单的生产者与消费者例子体验下:

  1. import threading
  2. import time
  3. condition = threading.Condition()
  4. products = 0 # 商品数量
  5. # 定义生产者线程类
  6. class Producer(threading.Thread):
  7. def run(self):
  8. global products
  9. while True:
  10. if condition.acquire():
  11. if products >= 99:
  12. condition.wait()
  13. else:
  14. products += 2
  15. print(self.name + "生产了2个产品,当前剩余产品数为:" + str(products))
  16. condition.notify()
  17. condition.release()
  18. time.sleep(2)
  19. # 定义消费者线程类
  20. class Consumer(threading.Thread):
  21. def run(self):
  22. global products
  23. while True:
  24. if condition.acquire():
  25. if products < 3:
  26. condition.wait()
  27. else:
  28. products -= 3
  29. print(self.name + "消耗了3个产品,当前剩余产品数为:" + str(products))
  30. condition.notify()
  31. condition.release()
  32. time.sleep(2)
  33. if __name__ == '__main__':
  34. # 创建五个生产者线程
  35. for i in range(5):
  36. p = Producer()
  37. p.start()
  38. # 创建两个消费者线程
  39. for j in range(2):
  40. c = Consumer()
  41. c.start()

运行结果

Condition维护着一个互斥锁对象(默认是RLock),也可以自己实例化一个
在Condition实例化的时候通过构造函数传入,SO,调用的Condition的
acquire与release函数,其实调用就是这个锁对象的acquire与release函数

下面详解下除了acquire与release函数外Condition提供的相关函数吧:
(注:下述方法只有在acquire之后才能调用,不然会报RuntimeError异常)


2.信号量(Semaphore)

信号量,也是一个很容易懂的东西,举个简单的例子:

假如厕所里有五个蹲坑,有人来开大,就会占用一个坑位,
所剩余的坑位-1,当五个坑都被人占满的时候,新来的人
就只能在外面等,直到有人出来为止。

这里的五个粪坑就是信号量蹲坑的人就是线程
初始值为5,来人-1,走人+1;超过初始值,新来的处于堵塞状态;

原理很简单,试试看下源码:

看下__init__方法

传入参数value,默认值为1,不能传入负数,否则抛ValueError异常;
创建了一个Condition条件变量,传入一个Lock实例;

接着看下acquire函数:

再接着是release函数,更简单

信号量+1,然后调用Condition变量的notify唤醒一个线程~

剩下的__enter____exit__就不用说了,重写这两个方法就能直接用with关键字了

就是那么简单,把我们蹲坑的那个例子写成代码吧:

  1. import threading
  2. import time
  3. import random
  4. s = threading.Semaphore(5) # 粪坑
  5. class Human(threading.Thread):
  6. def run(self):
  7. s.acquire() # 占坑
  8. print("拉屎拉屎 - " + self.name + " - " + str(time.ctime()))
  9. time.sleep(random.randrange(1, 3))
  10. print("拉完走人 - " + self.name + " - " + str(time.ctime()))
  11. s.release() # 走人
  12. if __name__ == '__main__':
  13. for i in range(10):
  14. human = Human()
  15. human.start()

输出结果


3.通用的条件变量(Event)

Python提供的用于线程间通信的信号标志,一个线程标识了一个事件,
其他线程处于等待状态,直到事件发生后,所有线程都会被激活。

Event对象属性实现了简单的线程通信机制,提供了设置信号,清楚信号,
等待等用于实现线程间的通信。提供以下四个可供调用的方法:

感觉有点蒙圈,看一波源码吧~

先是__init__函数

又是用到Condition条件变量,还有设置了一个_flag = False,这个就是标记吧!

is_set函数比较简单,返回_flag,

然后是set()函数

加锁,然后设置_flag为true,然后notify_all唤醒所有线程,最后释放锁,
简单,接着clear函数呢?

注释的意思是:重置内部标记为false,随后,调用wait()的线程将被堵塞,
直到调用set()将内部标记再次设置为true。也很简单,最后是wait方法:

判断标志是否为False,False的话进入堵塞状态,(⊙v⊙)嗯
源码就那么简单,感觉看完还是蒙圈不知道怎么用,写个简单的例子?
汽车过红绿灯的例子:

  1. import threading
  2. import time
  3. import random
  4. class CarThread(threading.Thread):
  5. def __init__(self, event):
  6. threading.Thread.__init__(self)
  7. self.threadEvent = event
  8. def run(self):
  9. # 休眠模拟汽车先后到达路口时间
  10. time.sleep(random.randrange(1, 10))
  11. print("汽车 - " + self.name + " - 到达路口...")
  12. self.threadEvent.wait()
  13. print("汽车 - " + self.name + " - 通过路口...")
  14. if __name__ == '__main__':
  15. light_event = threading.Event()
  16. # 假设有20台车子
  17. for i in range(20):
  18. car = CarThread(event=light_event)
  19. car.start()
  20. while threading.active_count() > 1:
  21. light_event.clear()
  22. print("红灯等待...")
  23. time.sleep(3)
  24. print("绿灯通行...")
  25. light_event.set()
  26. time.sleep(2)

输出结果


4.定时器(Timer)

与Thread类似,只是要等待一段时间后才会开始运行,单位秒,用法也很简单:

  1. import threading
  2. import time
  3. def skill_ready():
  4. print("!!!!!!大招已经准备好了!!!!!!")
  5. if __name__ == '__main__':
  6. t = threading.Timer(5, skill_ready)
  7. t.start()
  8. while threading.active_count() > 1:
  9. print("======大招蓄力中======")
  10. time.sleep(1)

输出结果


5.栅栏(Barrier)

Barrier直译栅栏,感觉不是很好理解,网上有个形象化的例子,把他比喻
成赛马用的栅栏,然后马(线程)依次来到栅栏前等待(wait),直到所有的马
都停在栅栏面前了,然后所有马开始同时出发(start)

简单点说就是,多个线程间的相互等待,调用了wait()方法的线程进入堵塞,
直到所有的线程都调用了wait()方法,然后所有线程同时进行就绪状态,
等待调度运行。

构造函数

Barrier(parties,action=None,timeout=None)

相关函数

BrokenBarrierErrorRuntimeError的子类,当栅栏被reset()或broken时引发;

(感觉都不知所云,写个简单的例子来熟悉下用法吧~)

例子公司一起去旅游

  1. import threading
  2. import time
  3. import random
  4. class Staff(threading.Thread):
  5. def __init__(self, barriers):
  6. threading.Thread.__init__(self)
  7. self.barriers = barriers
  8. def run(self):
  9. print("员工 【" + self.name + "】" + "出门")
  10. time.sleep(random.randrange(1, 10))
  11. print("员工 【" + self.name + "】" + "已签到")
  12. self.barriers.wait()
  13. def ready():
  14. print(threading.current_thread().name + ":人齐,出发,出发~~~")
  15. if __name__ == '__main__':
  16. print("要出去旅游啦,大家快集合~")
  17. b = threading.Barrier(10, action=ready, timeout=20)
  18. for i in range(10):
  19. staff = Staff(b)
  20. staff.start()

运行结果

PS:这里可以试下设置超时,还有修改ready方法,故意引起异常,
然后会抛出BrokenBarrierError异常。


6.小结

加了下班,终于把threading模块啃完了,不然爬小姐姐好玩,有成就感,
当然Python线程肯定不止那么简单,后面还有队列这些东西~慢慢来,不急。
下一节开始抠multiprocessing这个进程模块,又是块大骨头,敬请期待~


本节参考文献


来啊,Py交易啊

想加群一起学习Py的可以加下,智障机器人小Pig,验证信息里包含:
PythonpythonpyPy加群交易屁眼 中的一个关键词即可通过;

验证通过后回复 加群 即可获得加群链接(不要把机器人玩坏了!!!)~~~
欢迎各种像我一样的Py初学者,Py大神加入,一起愉快地交流学♂习,van♂转py。


添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注