[关闭]
@songhanshi 2021-01-14T11:26:28.000000Z 字数 97166 阅读 653

并发编程

Java学习


笔记内容来源:
1. jk:《Java并发编程实战》-王宝令(极客)
2. xn:《Java性能调优实战》-多线程性能调优-刘超(极客)

ListenableFuture

https://blog.csdn.net/PROGRAM_anywhere/article/details/83552126

tips

  1. 《操作系统原理》----“并发编程最早的应用领域就是操作系统的实现”
  2. github:https://github.com/CL0610/Java-concurrency

CPU缓存

  1. 是什么
    • 在金字塔式存储体系中它位于自顶向下的第二层,仅次于CPU寄存器。(然后是内存-存储器[ROM\RAM\Cache]、外存-硬盘)
      存储 硬盘<内存(<缓存Cache)
    • 高速缓冲存储器Cache是位于CPU与内存之间的临时存储器,它的容量比内存小但交换速度快。

一、并发的优缺点

  1. 为什么要用到并发

    • 充分利用多核CPU的计算能力;
      并发编程的形式可以将多核CPU的计算能力发挥到极致,性能得到提升。
    • 方便进行业务拆分,提升应用性能
      面对复杂业务模型,并行程序会比串行程序更适应业务需求,而并发编程更能吻合这种业务拆分 。
  2. 并发编程有哪些缺点

    • 时间片是CPU分配给各个线程的时间,因为时间非常短,所以CPU不断通过切换线程,让我们觉得多个线程是同时执行的,时间片一般是几十毫秒。而每次切换时,需要保存当前的状态起来,以便能够进行恢复先前状态,而这个切换时非常损耗性能,过于频繁反而无法发挥出多线程编程的优势。
  3. 减少上下文切换
    通常减少上下文切换可以采用无锁并发编程,CAS算法,使用最少的线程和使用协程。

    • 无锁并发编程:可以参照concurrentHashMap锁分段的思想,不同的线程处理不同段的数据,这样在多线程竞争的条件下,可以减少上下文切换的时间。
    • CAS算法,利用Atomic下使用CAS算法来更新数据,使用了乐观锁,可以有效的减少一部分不必要的锁竞争带来的上下文切换
    • 使用最少线程:避免创建不需要的线程,比如任务很少,但是创建了很多的线程,这样会造成大量的线程都处于等待状态
    • 协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换

    • Lmbench3测量上下文切换的时长 vmstat测量上下文切换次数

问题

1. 线程切换的问题

  1. 问题:抢购活动-多线程(xn)
    • 分析问题:通过工具分析,发现 cs(上下文切换每秒次数)指标已经接近了 60w,平时的话最高 5w。再通过日志分析,我发现了大量带有 wait() 的 Exception。
    • 初步怀疑是大量线程处理不及时导致的,进一步锁定问题是连接池大小设置不合
      理。
    • 解决:模拟了生产环境配置,对连接数压测进行调节,降低最大线程数,最后系统的性能就上去了。
    • 经验:并发程序中,并不是启动更多的线程就能让程序最大限度地并发执行。线程数量设置太小,会导致程序不能充分地利用系统资源;线程数量设置太大,又可能带来资源的过度竞争,导致上下文切换带来额外的系统开销

--------------------分割线------------

1. 并发理论基础

1-预备知识

1-上下文切换

  1. 时间片?

    • CPU 时间片是 CPU 分配给每个线程执行的时间段,一般为几十毫秒
    • 处理器给每个线程分配 CPU 时间片(Time Slice),线程在分配获得的时间片内执行任务。
    • 时间片决定了一个线程可以连续占用处理器运行的时长。
  2. 上下文切换是什么?

    • 当一个线程的时间片用完了,或者因自身原因被迫暂停运行了,这个时候,另外一个线程(可以是同一个线程或者其它进程的线程)就会被操作系统选中,来占用处理器。这种一个线程被暂停剥夺使用权,另外一个线程被选中开始或者继续运行的过程就叫做上下文切换(Context Switch)。
    • 上下文:一个线程被剥夺处理器的使用权而被暂停运行,就是“切出”;一个线程被选中占用处理器开始或者继续运行,就是“切入”。在这种切出切入的过程中,操作系统需要保存和恢复相应的进度信息,这个进度信息就是“上下文”了。
    • 下文包含内容:寄存器的存储内容以及程序计数器存储的指令内容。
      -- CPU 寄存器负责存储已经、正在和将要执行的任务
      -- 程序计数器负责存储CPU 正在执行的指令位置以及即将执行的下一条指令的位置。
    • 分类:
      -- 进程间的上下文切换
      多线程编程中,主要为线程间的上下文切换导致的性能问题
      -- 线程间的上下文切换
  3. 多核下的上下文切换?

    • CPU 数量远远不止一个的情况:
      操作系统将 CPU轮流分配给线程任务,此时的上下文切换就变得更加频繁了,并且存在跨 CPU 上下文切换,比起单核上下文切换,跨核切换更加昂贵。
  4. 多线程上下文切换的原因?

    • 线程主要有“新建”(NEW)、“就绪”(RUNNABLE)、“运行”(RUNNING)、“阻
      塞”(BLOCKED)、“死亡”(DEAD)五种状态。
    • 线程由RUNNABLE转为非RUNNABLE 的过程就是线程上下文切换;
    • 一个线程的状态由 RUNNING 转为 BLOCKED ,再由BLOCKED 转为 RUNNABLE ,然后再被调度器选中执行,这就是一个上下文切换的过程。
    • 当一个线程从RUNNING状态转为BLOCKED状态时,我们称为一个线程的暂停,线程暂停被切出之后,操作系统会保存相应的上下文,以便这个线程稍后再次进入RUNNABLE 状态时能够在之前执行进度的基础上继续执行。
    • 当一个线程从BLOCKED状态进入到RUNNABLE状态时,我们称为一个线程的唤醒,此时线程将获取上次保存的上下文继续完成执行。
    • 总结:多线程的上下文切换实际上就是由多线程两个运行状态的互相切换导致的。
  5. 线程运行时,各状态之间转换的原因?

    • 如线程状态由RUNNING转为BLOCKED或者由BLOCKED转为RUNNABLE;
    • 2种原因:
      -- 程序本身触发的切换,称为自发性上下文切换
      -- 由系统或者虚拟机诱发的非自发性上下文切换。
  6. 自发性上下文切换?

    • 概念:线程由 Java 程序调用导致切出,在多线程编程中,执行调用以下方法或关键字,常常就会引发自发性上下文切换。
    • sleep()、wait()、yield()、join()、park()、synchronized、lock
  7. 非自发性上下文切换?

    • 概念:
      指线程由于调度器的原因被迫切出。
    • 常见:
      -- 线程被分配的时间片用完,
      -- 虚拟机垃圾回收导致
      -- 执行优先级的问题导致。
    • JVM垃圾回收,原因
      JVM中,对象的内存都是由虚拟机中的堆分配的,在程序运行过程中,新的对象将不断被创建,如果旧的对象使用后不进行回收,堆内存将很快被耗尽。JVM提供了一种回收机制,对创建后不再使用的对象进行回收,从而保证堆内存的可持续性分配。而这种垃圾回收机制的使用有可能会导致stop-the-world事件的发生,这其实就是一种线程暂停行为。
  8. 上下文切换性能问题?
    串行和并行代码的比较:

    • 串联的执行速度比并发的执行速度要快。因为线程的上下文切换导致了额外的开销,
    • 使用 Synchronized 锁关键字,会导致了资源竞争,从而引起了上下文切换
    • 即使不使用 Synchronized锁关键字,并发的执行速度也无法超越串联的执行速度,这是因为多线程同样存在着上下文切换。
    • Redis、NodeJS 的设计就很好地体现了单线程串行的优势。
  9. 如何监测到上下文切换?

    • Linux
      vmstat命令:监视Java程序运行过程中系统的上下文切换频率;
      pidstat命令:监控指定进程的Context Switch上下文切换
  10. 上下文切换到底开销在哪些环节?

    • 操作系统保存和恢复上下文;
    • 调度器进行线程调度;
    • 处理器高速缓存重新加载;
    • 上下文切换也可能导致整个高速缓存区被冲刷,从而带来时间开销
  11. 在并发量比较大的情况下,什么时候用单线程,什么时候用多线程呢?

    • 一般在单个逻辑比较简单,而且速度相对来非常快的情况下,我们可以使用单线程。
      例如,Redis,从内存中快速读取值,不用考虑 I/O 瓶颈带来的阻塞问题。
    • 在逻辑相对来说很复杂的场景,等待时间相对较长又或者是需要大量计算的场景,我建议使用多线程来提高系统的整体性能。
      例如,NIO时期的文件读写操作、图像处理以及大数据分析等。
  12. 在多线程中使用Synchronized还会发生进程间的上下文切换吗?具体又会发生在哪些环节呢?

    • 进程上下文切换,是指用户态和内核态的来回切换。我们知道,如果一旦Synchronized锁资源竞争激烈,线程将会被阻塞,阻塞的线程将会从用户态调用内核态,尝试获取mutex,这个过程就是进程上下文切换。
    • 使用Synchronized获得锁失败,进入等待队列会发生上下文切换。
    • Synchronized在轻量级锁之前,锁资源竞争产生的是线程上下文切换,一旦升级到重量级锁,就会产生进程上下文切换

2-线程的生命周期状态

jk
  1. 图示
    • 线程主要有“新建”(NEW)、“就绪”(RUNNABLE)、“运行”(RUNNING)、“阻塞”(BLOCKED)、“死亡”(DEAD)五种状态。
      在这里插入图片描述

3-管程

  1. 参考:
    本节说的可能并不好。该篇我看了三遍也没能完全看懂,于是自己搜索java管程相关的技术文章,才大致对管程有了个认知,总结如下:
    1.管程是一种概念,任何语言都可以通用。
    2.在java中,每个加锁的对象都绑定着一个管程(监视器)
    3.线程访问加锁对象,就是去拥有一个监视器的过程。如一个病人去门诊室看医生,医生是共享资源,门锁锁定医生,病人去看医生,就是访问医生这个共享资源,门诊室其实是监视器(管程)。
    4.所有线程访问共享资源,都需要先拥有监视器。就像所有病人看病都需要先拥有进入门诊室的资格。
    5.监视器至少有两个等待队列。一个是进入监视器的等待队列一个是条件变量对应的等待队列。后者可以有多个。就像一个病人进入门诊室诊断后,需要去验血,那么它需要去抽血室排队等待。另外一个病人心脏不舒服,需要去拍胸片,去拍摄室等待。
    6.监视器要求的条件满足后,位于条件变量下等待的线程需要重新在门诊室门外排队,等待进入监视器。就像抽血的那位,抽完后,拿到了化验单,然后,重新回到门诊室等待,然后进入看病,然后退出,医生通知下一位进入。
    总结起来就是,管程就是一个对象监视器。任何线程想要访问该资源,就要排队进入监控范围。进入之后,接受检查,不符合条件,则要继续等待,直到被通知,然后继续进入监视器。

  2. 管程的发展?

    • 管程是一把解决并发问题的万能钥匙;
    • Java1.5前,提供的唯一的并发原语就是管程;
    • Java1.5后,提供的SDK并发包,也是以管程技术为基础的。
    • 除此之外,C/C++、C# 等高级语言也都支持管程。
  3. 为什么Java在1.5之前仅仅提供了synchronized关键字及wait()、notify()、notifyAll()这三个看似从天而降的方法?
    在刚接触Java的时候,我以为它会提供信号量这种编程原语,因为操作系统原理课程告诉我,用信号量能解决所有并发问题,结果我发现不是。

    • 原因:
    • Java 采用的是管程技术,synchronized关键字及wait()、notify()、notifyAll()这三个方法都是管程的组成部分。
    • 管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。但是管程更容易使用,所以 Java 选择了管程。
  4. 什么是管程?

    • 管程,英文是 Monitor,Java领域多直译监视器。操作系统领域翻译成管程。
    • 管程指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。
    • 翻译为Java领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。
  5. 管程如何管理?

    • 管程模型:MESA 模型
    • 发展:先后出现过三种不同的管程模型,分别是:Hasen模型、Hoare模型和MESA模型。
    • 现广泛应用的是MESA模型
    • Java管程的实现参考的也是MESA模型。
  6. MESA模型介绍?

    • 在并发编程领域,有两大核心问题:
      -- 互斥,即同一时刻只允许一个线程访问共享资源;
      -- 同步,即线程之间如何通信、协作。
    • 这两大问题,管程都是能够解决的。
    • 管程如何解决互斥问题?
      -- 将共享变量及其对共享变量的操作统一封装起来。
      -- 管程X将共享变量queue这个队列和相关的操作入队enq()、出队deq()都封装起来;线程A和B如果想访问共享变量queue,只能通过调用管程提供的enq()、deq()方法来实现;enq()、deq()保证互斥性,只允许一个线程进入管程。
      -- 管程模型和面向对象高度契合。估计这也是Java选择管程的原因吧。
      -- 后面的互斥锁背后的模型其实就是它。
      在这里插入图片描述
    • 管程如何解决线程间同步问题
      -- 如下,MESA管程模型示意图,详细描述MESA模型的主要组成部分。
      -- 在管程模型里,共享变量和对共享变量的操作是被封装起来的,最外层的框就代表封装。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。
      -- 管程里还引入了条件变量的概念,而且每个条件变量都对应有一个等待队列,如图,条件变量A 和条件变量 B 分别都有自己的等待队列。
      在这里插入图片描述
    • 条件变量和等待队列的作用是什么呢?
      -- 解决线程同步问题。
      -> 假设有个线程T1执行出队操作,操作前提条件是队列不能是空的,而队列不空这个前提条件就是管程里的条件变量。
      -> 如果线程T1进入管程后恰好发现队列是空的,就去条件变量对应的等待队列里面等。此时线程T1就去“队列不空”这个条件变量的等待队列中等待。
      -> 线程T1进入条件变量的等待队列后,是允许其他线程进入管程的。
      -> 再假设之后另外一个线程T2执行入队操作,入队操作执行成功之后,“队列不空”这个条件对于线程 T1 来说已经满足了,此时线程T2要通知T1,告诉它需要的条件已经满足了。当线程 T1得到通知后,会从等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。
    • 说一下wait()、notify()、notifyAll() 这三个操作?
      -- 前面线程T1发现“队列不空”这个条件不满足,需要进到对应的等待队列里等待。这个过程就是通过调用wait()来实现的。如果用对象A代表“队列不空”这个条件,那么线程T1需要调用A.wait()。同理当“队列不空”这个条件满足时,线程T2需要调用A.notify()来通知A等待队列中的一个线程,此时这个队列里面只有线程T1。至于notifyAll() 这个方法,它可以通知等待队列中的所有线程。
  7. 代码再次说明MESA模型?

    • 代码实现一个阻塞队列,阻塞队列有两个操作分别是入队和出队,这两个方法都是先获取互斥锁,类比管程模型中的入口。
      1) 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();
      2) 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();
      3) 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列;
      4) 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列。

      1. public class BlockedQueue<T> {
      2. final Lock lock = new ReentrantLock();
      3. // 条件变量:队列不满
      4. final Condition notFull = lock.newCondition();
      5. // 条件变量:队列不空
      6. final Condition notEmpty = lock.newCondition();
      7. // 入队
      8. void enq(T x) {
      9. lock.lock();
      10. try {
      11. while (队列已满)
      12. // 等待队列不满
      13. notFull.await();
      14. // 省略入队操作...
      15. // 入队后, 通知可出队
      16. notEmpty.signal();
      17. } finally {
      18. lock.unlock();
      19. }
      20. }
      21. // 出队
      22. void deq() {
      23. lock.lock();
      24. try {
      25. while (队列已空)
      26. // 等待队列不空
      27. notEmpty.await();
      28. // 省略出队操作...
      29. // 出队后,通知可入队
      30. notFull.signal();
      31. } finally {
      32. lock.unlock();
      33. }
      34. }
      35. }
    • 示例中,用了Java并发包里面的Lock和Condition。
      注意:await()和前面我们提到的wait()语义是一样的;signal()和前面我们提到的notify()语义是一样的。

  8. wait() 的正确姿势?

    • 对于MESA管程来说,有一个编程范式,就是需要在一个while循环里面调用wait()。这个是MESA管程特有的。

      1. while(条件不满足) {
      2. wait();
      3. }
    • Hasen模型、Hoare模型和MESA模型的一个核心区别就是当条件满足后,如何通知相关线程。

    • 管程要求同一时刻只允许一个线程执行,那当线程T2的操作使线程T1等待的条件满足时,T1 和 T2 究竟谁可以执行呢?
      1)Hasen 模型里面,要求 notify() 放在代码的最后,这样 T2 通知完 T1 后,T2 就结束了,然后 T1 再执行,这样就能保证同一时刻只有一个线程执行。
      2)Hoare 模型里面,T2 通知完 T1 后,T2 阻塞,T1 马上执行;等 T1 执行完,再唤醒 T2,也能保证同一时刻只有一个线程执行。但是相比 Hasen 模型,T2 多了一次阻塞唤醒操作。
      3)MESA 管程里面,T2通知完T1后,T2还是会接着执行,T1并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是notify() 不用放到代码的最后,T2也没有多余的阻塞唤醒操作。但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。
  9. notify()何时可以使用?

    • 除非经过深思熟虑,否则尽量使用notifyAll()。
    • notify()的使用需要满足以下三个条件:
      1)所有等待线程拥有相同的等待条件;
      2)所有等待线程被唤醒后,执行相同的操作;
      3)只需要唤醒一个线程。
    • 比如上面阻塞队列的例子中,对于“队列不满”这个条件变量,其阻塞队列里的线程都是在等待“队列不满”这个条件,反映在代码里就是下面这3行代码。对所有等待线程来说,都是执行这3行代码,重点是while里面的等待条件是完全相同的。
      1. while (队列已满)
      2. // 等待队列不满
      3. notFull.await()

    所有等待线程被唤醒后执行的操作也是相同的,都是下面这几行:

    1. // 省略入队操作...
    2. // 入队后, 通知可出队
    3. notEmpty.signal();

    同时也满足第 3 条,只需要唤醒一个线程。所以上面阻塞队列的代码,使用 signal() 是可以的。

  10. Java内置的管程?

    • Java 参考了 MESA 模型,语言内置的管程(synchronized)对 MESA 模型进行了精简。
    • MESA模型中,条件变量可以有多个,Java语言内置的管程里只有一个条件变量。
      在这里插入图片描述
    • Java内置的管程方案(synchronized)使用简单,synchronized关键字修饰的代码块,在编译期会自动生成相关加锁和解锁的代码,但是仅支持一个条件变量;
    • 而Java SDK并发包实现的管程支持多个条件变量,不过并发包里的锁,需要开发人员自己进行加锁和解锁操作。

3. 数据一致性

1-一致性概念
  1. 主要内容?
    “并发编程中共享变量的一致性”。
    CPU缓存导致的数据不一致、重排序

  2. 结合Happens-before规则,可以将一致性分为以下几个级别:

    • 严格一致性(强一致性):所有的读写操作都按照全局时钟下的顺序执行,且任何时刻线程读取到的缓存数据都是一样的,Hashtable 就是严格一致性;
      在这里插入图片描述
    • 顺序一致性:多个线程的整体执行可能是无序的,但对于单个线程而言执行是有序的,要保证任何一次读都能读到最近一次写入的数据,volatile可以阻止指令重排序,所以修饰的变量的程序属于顺序一致性;
      在这里插入图片描述
    • 弱一致性:不能保证任何一次读都能读到最近一次写入的数据,但能保证最终可以读到写入的数据,单个写锁 + 无锁读,就是弱一致性的一种实现。
  3. Happens-before 规则?

    • 为了解决重排序问题,Java 提出了 Happens-before 规则来规范线程的执行顺序:
      ① 程序次序规则:在单线程中,代码的执行是有序的,虽然可能会存在运行指令的重排序,但最终执行的结果和顺序执行的结果是一致的;
      ② 锁定规则:一个锁处于被一个线程锁定占用状态,那么只有当这个线程释放锁之后,其它线程才能再次获取锁操作;
      ③ volatile 变量规则:如果一个线程正在写 volatile 变量,其它线程读取该变量会发生在写入之后;
      ④ 线程启动规则:Thread 对象的 start() 方法先行发生于此线程的其它每一个动作;
      ⑤ 线程终结规则:线程中的所有操作都先行发生于对此线程的终止检测;
      ⑥ 对象终结规则:一个对象的初始化完成先行发生于它的 finalize() 方法的开始;
      ⑦ 传递性:如果操作 A happens-before 操作 B,操作 B happens-before 操作 C,那么操作 A happens-before 操作 C;
      ⑧ 线程中断规则:对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生。
2-不一致原因
  1. 多线程操作共享变量可能出现的问题?

    • 假设有两个线程(线程 1 和线程 2)分别执行下面的方法,x 是共享变量:

      1. // 代码 1
      2. public class Example {
      3. int x = 0;
      4. public void count() {
      5. x++; //1
      6. System.out.println(x)//2
      7. }
      8. }
    • 两个线程同时运行,线程的变量的值可能的结果:

    线程1调用count 线程2调用count
    x++; x++;

    结果:

    结果1 结果2 结果3
    1,1 2,1 1,2
  2. CPU缓存-理解"1,1"的情况?

    • CPU缓存可以分为一级缓存(L1)、二级缓存(L2)和三级缓存(L3),每一级缓存中所储存的全部数据都是下一级缓存的一部分。当CPU要读取一个缓存数据时,首先会从一级缓存中查找;如果没有找到,再从二级缓存中查找;如果还是没有找到,就从三级缓存或内存中查找。
    • 单核CPU:
      如果是单核CPU运行多线程,多个线程同时访问进程中的共享数据,CPU 将共享变量加载到高速缓存后,不同线程在访问缓存数据的时候,都会映射到相同的缓存位置,这样即使发生线程的切换,缓存仍然不会失效。
    • 多核CPU:
      如果是多核CPU运行多线程,每个核都有一个L1缓存,如果多个线程运行在不同的内核上访问共享变量时,每个内核的 L1 缓存将会缓存一份共享变量。
      在这里插入图片描述
  3. CPU缓存-理解"1,1"的情况?
    1,1 的运行结果:

    • 如果是多核 CPU 运行多线程,每个核都有一个 L1 缓存,如果多个线程运行在不同的内核上访问共享变量时,每个内核的 L1缓存将会缓存一份共享变量。
    • 假设线程 A 操作 CPU 从堆内存中获取一个缓存数据,此时堆内存中的缓存数据值为 0,该缓存数据会被加载到 L1 缓存中,在操作后,缓存数据的值变为 1,然后刷新到堆内存
      中。
    • 在正好刷新到堆内存中之前,又有另外一个线程 B 将堆内存中为 0 的缓存数据加载到了另外一个内核的 L1 缓存中,此时线程 A 将堆内存中的数据刷新到了 1,而线程 B 实际拿到的缓存数据的值为 0。
    • 此时,内核缓存中的数据和堆内存中的数据就不一致了,且线程 B在刷新缓存到堆内存中的时候也将覆盖线程 A中修改的数据。这时就产生了数据不一致的问题。
      在这里插入图片描述
  4. 重排序问题?

    • 在不影响运算结果的前提下,编译器有可能会改变顺序代码的指令执行顺序,特别是在一些可以优化的场景。
    1. // 代码 1
    2. public class Example {
    3. int x = 0;
    4. boolean flag = false;
    5. public void writer() {
    6. x = 1; //1
    7. flag = true; //2
    8. }
    9. public void reader() {
    10. if (flag) { //3
    11. int r1 = x; //4
    12. System.out.println(r1==x)
    13. }
    14. }
    15. }

    在这里插入图片描述

    • r1=1 的运行结果如下:
      在这里插入图片描述
    • r1=0 的运行结果如下:
      在这里插入图片描述
  5. 重排序-问题解析?

    • 在JVM中,重排序是十分重要的一环,特别是在并发编程中。可JVM要是能对它们进行任意排序的话,也可能会给并发编程带来一系列的问题,其中就包括了一致性的问题。
    • 编译器为了尽可能地减少寄存器的读取、存储次数,会充分复用寄存器的存储值。如果没有进行
      重排序优化,正常的执行顺序是步骤1\2\3,而在编译期间进行了重排序优化之后,执行的步骤有可能就变成了步骤1/3/2 或者 2/1/3,这样就能减少一次寄存器的存取次数。
      1. int x = 1;//步骤1:加载x变量的内存地址到寄存器中,加载1到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中
      2. boolean flag = true; //步骤2 加载flag变量的内存地址到寄存器中,加载true到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中
      3. int y = x + 1;//步骤3 重新加载x变量的内存地址到寄存器中,加载1到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中

3. 其他基本概念

  1. 临界区?

    • 一段需要互斥执行的代码称为临界区。
  2. 管程?

    • 一种通用的同步原语,在Java中指的就是synchronized,synchronized是Java里对管程的实现。
    • 管程中的锁在Java里是隐式实现的
      例如下面的代码,在进入同步块之前,会自动加锁,而在代码块执行完会自动释放锁,加锁以及释放锁都是编译器帮我们实现的。
      1. synchronized (this)
      2. { // 此处自动加锁
      3. // x 是共享变量, 初始值 =10
      4. if (this.x < 12) {
      5. this.x = 12;
      6. }
      7. } // 此处自动解锁

2-并发基础理论

1. 并发编程中的问题

  1. 总结?
    • 并发编程微观上涉及到原子性问题、可见性问题和有序性问题,宏观则表现为安全性、活跃性以及性能问题。
    • 在设计并发程序的时候,主要是从宏观出发,也就是要重点关注它的安全性、活跃性以及性能。安全性方面要注意数据竞争和竞态条件,活跃性方面需要注意死锁、活锁、饥饿等问题,性能方面介绍了两个方案,但具体问题具体分析。
1-安全性问题
  1. 什么是线程安全呢?

    • 本质上就是正确性
    • 正确性的含义就是程序按照我们期望的执行
  2. 如何写出线程安全的程序呢?

    • 并发Bug的三个主要源头:原子性问题、可见性问题和有序性问题。
    • 理论上线程安全的程序,就要避免出现原子性问题、可见性问题和有序性问题。
  3. 所有的代码都需要分析否存在这三个问题吗?

    • 只有一种情况:
      存在共享数据并且该数据会发生变化,通俗地讲就是有多个线程会同时读写同一数据
    • 解决一:
      -- 不共享数据或者数据状态不发生变化。
      -- 方案:如线程本地存储(Thread Local Storage,TLS)、不变模式等等。
    • 解决二:
      现实中,必须共享会发生变化的数据的应用场景还很多。
  4. 什么叫数据竞争?

    • 概念:
      当多个线程同时访问同一数据,并且至少有一个线程会写这个数据的时候,如果我们不采取防护措施,那么就会导致并发Bug,对此还有一个专业的术语,叫做数据竞争(Data Race)
    • 例子1:
      如下,当多个线程调用add10k()方法时就会发生数据竞争。
      1. public class Test {
      2. private long count = 0;
      3. void add10K() {
      4. int idx = 0;
      5. while(idx++ < 10000) {
      6. count += 1;
      7. }
      8. }
      9. }
  5. 竞态条件(Race Condition)?

    • 概念
    • 指的是程序的执行结果依赖线程执行的顺序。
    • 在并发环境里,线程的执行顺序是不确定的,如果程序存在竞态条件问题,那就意味着程序执行的结果是不确定的,而执行结果不确定这可是个大Bug。
    • 抽象理解
      理解竞态条件:并发场景中,程序的执行依赖于某个状态变量,类似于:
      1. if (状态变量 满足 执行条件) {
      2. 执行操作
      3. }

    -- 当某个线程发现状态变量满足执行条件后,开始执行操作;可是就在这个线程执行操作的时候,其他线程同时修改了状态变量导致状态变量不满足执行条件了。当然很多场景下,这个条件不是显式的。

    • 例子2-1:例子1加锁
      -- 在访问数据的地方加个锁保护,如下:
      -- 所有访问共享变量value的地方,都增加了互斥锁,此时是不存在数据
      竞争的。但add10K()并不是线程安全的。

      1. public class Test {
      2. private long count = 0;
      3. // synchronized
      4. synchronized long get(){
      5. return count
      6. }
      7. // synchronized
      8. synchronized void set(long v){
      9. count = v;
      10. }
      11. void add10K() {
      12. int idx = 0;
      13. while(idx++ < 10000) {
      14. set(get()+1)
      15. }
      16. }
      17. }
    • 问题:竞态条件
      -- 假设count=0,A、B两个线程同时执行get()方法时,get()方法会返回相同的值0,A、B执行get()+1操作,结果都是1,之后A、B再将结果1写入了内存。期望是 2,而结果却是1。
      -- A、B完全同时执行,结果是 1;A、B前后执行,结果是2。

    • 例子2-2:转账操作
    • 转账操作里面有个判断条件——转出金额不能大于账户余额,但在并发环境里面,如果不加控制,当多个线程同时对一个账号执行转出操作时,就有可能出现超额转出问题。
    • 假设,账户A余额200,线程1、2都要从账户A转出150,存在程1、2同时执行到第6行,这样1和2都会发现转出金额150小于账户余额200,于是就会发生超额转出的情况。
      1. class Account {
      2. private int balance;
      3. // 转账
      4. void transfer(Account target, int amt){
      5. if (this.balance > amt) { // 6
      6. this.balance -= amt; // A-150
      7. target.balance += amt;//B+150
      8. }
      9. }
      10. }
  6. 数据竞争、竞态条件如何保证线程安全?

    • 互斥这个技术方案
    • 实现互斥的方案,如CPU提供了相关的互斥指令,操作系统、编程语言
      也会提供相关的API。从逻辑上来看,可以统一归为:锁。
2-活跃性问题
  1. 活跃性问题概念?

    • 指的是某个操作无法执行下去。
    • 典型的活跃性问题:死锁,还有活锁、饥饿两种情况
  2. 死锁、活锁、饥饿?

    • 1-死锁:发生“死锁”后,线程会互相等待,而且会一直等待下去,技术上表现形式是线程永久地“阻塞”了。
    • 2-活锁:线程没发生阻塞,但仍存在执行不下去的情况。
    • 活锁例子:
      -- 类比,路人甲从左边出门,乙从右边进门,为了不相撞,互相谦让,甲让路走右边,乙也让甲走左边,结果又相撞。基于人的交流,谦让几次就解决了。而编程世界,有可能会一直谦让下去,成为没有发生阻塞但依然执行不下去的活锁。
      -- 解决
      尝试等待一个随机的时间。甲走左发现前面有人,并不是立刻换到右手边,而是等待一个随机的时间后,再换到边;同样,路人乙也不是立刻切换路线,也是等待一个随机的时间再切换。由于路人甲和路人乙等待的时间是随机的,所以同时相撞后再次相撞的概率就很低了。“等待一个随机时间”的方案虽然很简单,却非常有效
    • 3-饥饿
      -- 指的是线程因无法访问所需资源而无法执行下去的情况。
      “不患寡,而患不均”,如果线程优先级“不均”,在CPU繁忙的情况下,优先级低的线程得到执行的机会很小,就可能发生线程“饥饿”;持有锁的线程,如果执行的时间过长,也可能导致“饥饿”问题。
    • 饥饿三种解决方案
      -- 一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。
      -- 一、三的适用场景比较有限,因为很多场景下,资源的稀缺性是没办法解决的,持有锁的线程执行的时间也很难缩短。二的适用场景相对来说更多一些。
    • 如何公平地分配资源
      -- 并发编程里,主要是使用公平锁。
      -- 公平锁,是一种先来后到的方案,线程的等待是有顺序的,排在等待队列前面的线程会优先获得资源。
3-性能问题
  1. 性能问题?

    • 过度使用锁,也可能出“性能问题”。
    • “锁”的过度使用可能导致串行化的范围过大,不能够发挥多线程的优势了,而使用多线程搞并发程序的目的,主要为的就是提升性能。
    • 所以我们要尽量减少串行
  2. 串行对性能的影响是怎么样的呢?
    假设串行百分比是 5%,用多核多线程相比单核单线程能提速多少呢?

    • 阿姆达尔(Amdahl)定律,代表了处理器并行运算之后效率提升的能力:
      20201127015019225.png#pic_center未知大小
      其中,n理解为CPU的核数,p可以理解为并行百分比,那(1-p)就是串行百分比了,也就是我们假设的5%。再假设CPU的核数(也就是n)无穷大,那加速比 S 的极限就是20。
      即,如果我们的串行率是5%,那么无论采用什么技术,最高也就只能提高 20倍的性能。
  3. 怎么避免锁带来的性能问题呢?
    方案层面:

    • 第一,既然使用锁会带来性能问题,那最好的方案自然就是使用无锁的算法和数据结构了。
      -- 实现:如线程本地存储 (Thread Local Storage, TLS)、写入时复制(Copyon-write)、乐观锁等;Java并发包里面的原子类也是一种无锁的数据结构;Disruptor则是一个无锁的内存队列,性能都非常好……
    • 第二,减少锁持有的时间。互斥锁本质上是将并行的程序串行化,所以要增加并行度,一定要减少持有锁的时间。
      -- 实现:如使用细粒度的锁,一个典型的例子就是 Java 并发包里的 ConcurrentHashMap,使用了所谓分段锁的技术;还可以使用读写锁,也就是读是无锁的,只有写的时候才会互斥。
  4. 性能方面的三个重要度量指标?

    • 吞吐量、延迟和并发量。
    • 吞吐量:指的是单位时间内能处理的请求数量。吞吐量越高,说明性能越好。
    • 延迟:指的是从发出请求到收到响应的时间。延迟越小,说明性能越好。
    • 并发量:指的是能同时处理的请求数量,一般来说随着并发量的增加、延迟也会增加。所以延迟这个指标,一般都会是基于并发量来说的。例如并发量是 1000 的时候,延迟是 50 毫秒。

1. 可见性、原子性、有序性

  1. 并发问题的根源(CPU、内存、I/O)?
    1)缓存导致的可见性问题
    2)线程切换带来的原子性问题
    3)CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语言的操作符
1. 可见性
  1. 可见性概念
    一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性。

  2. 缓存导致的可见性问题?

    • 单核,所有线程都是操作同一个CPU的缓存,一个线程对缓存的写,对另外一个线程来说一定是可见的。
    • 多核,每个CPU都有自己的缓存,当多个线程在不同的CPU上执行时,这些线程操作的是不同的CPU缓存。
  3. 验证多核场景下的可见性问题

    1. public class Test {
    2. private static long count =0;
    3. private void add10K(){
    4. int idx=0;
    5. while (idx++<100000){
    6. count+=1;
    7. }
    8. }
    9. private static long calc(){
    10. final Test test =new Test();
    11. //创建两个线程,执行 add() 操作
    12. Thread thread1=new Thread(()->{
    13. test.add10K();
    14. });
    15. Thread thread2=new Thread(()->{
    16. test.add10K();
    17. });
    18. //启动两个线程
    19. thread1.start();
    20. thread2.start();
    21. //等待两个线程执行结束
    22. try {
    23. thread1.join();
    24. thread2.join();
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. }
    28. System.out.println(count);
    29. return count;
    30. }
    31. public static void main(String[] args) {
    32. calc();
    33. }
    34. }
    • add10K() 方法,都会循环 10000 次 count+=1 操作。(10000次太少)
    • calc() 方法中我们创建了两个线程,每个线程调用一次 add10K()
    • 循环 10000 次 count+=1操作如果改为循环 1亿次,你会发现效果更明显,最终 count的值接近 1 亿,而不是 2 亿。如果循环 10000 次,count的值接近20000,原因是两个线程不是同时启动的,有一个时差。
2. 原子性
  1. 时间片

    • 操作系统允许某个进程执行一小段时间,例如 50 毫秒,过了 50 毫秒操作系统就会重新选择一个进程来执行(我们称为“任务切换”),这个 50 毫秒称为“时间片”。
  2. 原子性概念

    • 我们把一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性。
  3. 线程切换
    线程切换示意图

    • 在一个时间片内,如果一个进程进行一个 IO 操作,例如读个文件,这个时候该进程可以把自己标记为“休眠状态”并出让 CPU 的使用权,待文件读进内存,操作系统会把这个休眠的进程唤醒,唤醒后的进程就有机会重新获得 CPU 的使用权了。
    • 这里的进程在等待 IO 时之所以会释放 CPU 使用权,是为了让 CPU 在这段等待时间里可以做别的事情,这样一来 CPU 的使用率就上来了;此外,如果这时有另外一个进程也读文件,读文件的操作就会排队,磁盘驱动在完成一个进程的读操作后,发现有排队的任务,就会立即启动下一个读操作,这样 IO 的使用率也上来了。
  4. 任务切换->线程切换发展

    • 早期的操作系统基于进程来调度CPU,不同进程间是不共享内存空间的,所以进程要做任务切换就要切换内存映射地址,
    • 一个进程创建的所有线程,都是共享一个内存空间的,所以线程做任务切换成本就很低了。
    • 现代的操作系统都基于更轻量的线程来调度,现在我们提到的“任务切换”都是指“线程切换”。
  5. count+=1
    1)CPU指令

    • 任务切换的时机大多数是在时间片结束的时候
    • 高级语言里一条语句往往需要多条 CPU 指令完成
    • 操作系统做任务切换,可以发生在任何一条CPU 指令执行完
    • CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语言的操作符,

    2)执行顺序

    • count += 1,至少需要三条 CPU 指令

      指令 1:把变量 count 从内存加载到 CPU 的寄存器;
      指令 2:在寄存器中执行 +1 操作;
      指令 3:将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。

    • 假设 count=0,
      A 在指令 1 执行完后做线程切换,
      A、B 如下序列执行,
      其中两个线程都执行了 count+=1 ,
      但结果 2,而是 1。
      非原子操作

1.3 有序性
  1. 有序性

    • CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语言的操作符
  2. 例子

    • 编译器优化,有时候会改
      变程序中语句的先后顺序
    • 如:“a=6;b=7;”
      编译器优化后可能变成“b=7;a=6;”
  3. 双重检查单例模式(DCL)

    • 线程 A、B 同时调用 getInstance() 方法,同时发现 instance ==null,于是同时对 Singleton.class 加锁,此时 JVM 保证只有一个线程能够加锁成功(假设是 A),另外一个线程则会处于等待状态(假设是 B);
    • A 会创建一个 Singleton实例,之后释放锁,锁释放后,线程 B 被唤醒,线程 B 再次尝试加锁,此时是可以加锁成功的,加锁成功后,线程 B 检查 instance == null 时会发现,已经创建过 Singleton 实例了,所以线程 B 不会再创建一个 Singleton 实例。
    • https://blog.csdn.net/qq_38734403/article/details/106976266
      1. public class SingletonLazy {
      2. /**懒汉模式-双重检验锁-线程安全且并行效率高*/
      3. private static SingletonLazy instance;
      4. public static SingletonLazy getInstance() {
      5. // 先判断实例是否存在,若不存在再对类对象进行加锁处理
      6. if (instance == null) { //Single Checked
      7. synchronized (SingletonLazy.class) {
      8. if (instance == null) { //Double Checked
      9. instance = new SingletonLazy();
      10. }
      11. }
      12. }
      13. return instance;
      14. }
      15. }
  4. DCL 问题:出在 new 操作上

    • 我们以为的 new 操作应该是:
      1)分配一块内存 M;
      2)在内存 M 上初始化 Singleton 对象;
      3)然后 M 的地址赋值给 instance 变量。
    • 实际优化后的执行路径是:
      1) 分配一块内存 M;
      2) 将 M 的地址赋值给 instance 变量;
      3) 最后在内存 M 上初始化 Singleton 对象。
  5. DCL 问题:导致问题-空指针

    • A 先执行 getInstance() 方法,当执行完指令 2 时恰好发生了线程切换,切换到了线程 B 上;
    • 如果此时线程 B 也执行 getInstance() 方法,那么线程 B 会发现instance != null,所以直接返回 instance,
    • 而此时的 instance 是没有初始化过的,如果我们这个时候访问 instance 的成员变量就可能触发空指针异常。
      DCL异常执行路径

2. JMM

  1. 概念:
    • Java内存模型(JMM)是个很复杂的规范,规范了JVM如何提供按需禁用缓存和编译优化的方法。
    • 具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及六
      项Happens-Before规则。
  2. Java如何解决可见性和有序性问题
2.1 JMM概述
  1. JMM作用
    • Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的
      方法。
    • volatile、synchronized 和 final 三个关键字
    • 六项 Happens-Before 规则
2.2 volatile
  1. 概述

    • volatile 关键字并不是 Java 语言的特产,古老的 C 语言里也有,它最原始的意义就是禁用 CPU 缓存。
    • 声明一个 volatile 变量 volatile int x = 0,它表达的是:告诉编译器,对这个变量的读写,不能使用 CPU 缓存,必须从内存中读取或者写入。
  2. 例子

    • 线程 A 执行 writer() 方法,按照 volatile 语义,会把变量“v=true” 写入内存;假设线程 B 执行 reader() 方法,同样按照 volatile 语义,线程 B会从内存中读取变量 v,如果线程 B 看到 “v == true” 时,那么线程 B 看到的变量 x多少呢?
    • Java1.5前:x 可能是 42或 0;
    • Java1.5后:x 等于 42。
      1. class VolatileDemo {
      2. int x = 0;
      3. volatile boolean v = false;
      4. public void writer() {
      5. x = 42;
      6. v = true;
      7. }
      8. public void reader() {
      9. if (v == true) {
      10. // 这里 x 会是多少呢?
      11. }
      12. }
      13. }
  3. Java5前的问题

    • 问题:Java5前出现 x = 0
    • 原因:变量 x 可能被 CPU缓存而导致可见性问题。
    • 解决:Java 内存模型在 1.5 版本对 volatile 语义进行了增强。即Happens-
      Before 规则。(Java5已解决
2.3 Happens-Before 规则
  1. 概念

    • Happens-Before并不是说前面一个操作发生在后续操作的前面,它真正要表达的是:前面一个操作的结果对后续操作是可见的
    • 比较正式的说法是:Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守Happens-Before 规则。
  2. Happens-Before 六规则

    • 程序的顺序性规则
    • 这条规则是指在一个线程中,按照程序顺序,前面的操作Happens-Before于后续的任意操作。
    • 程序前面对某个变量的修改一定是对后续操作可见的。

      1. // “x = 42;” Happens-Before “v = true;”
      2. x = 42;
      3. v = true;
    • volatile 变量规则

    • 这条规则是指对一个 volatile 变量的写操作, Happens-Before 于后续对这个volatile变量的读操作。
    • 关联规则3看
    • 传递性
    • A Happens-Before B,
      且 B Happens-Before C,
      则 A Happens-Before C。
      传递性规则
    • 规则 2:
      写变量“v=true” Happens-Before 读变量 “v=true”
    • x=42” Happens-Before 读变量“v=true”
      如果线程 B 读到了“v=true”,那么线程 A 设置的“x=42”对线程 B 是可见的。也就是说,线程 B 能看到 “x == 42”
      即 1.5 版本对volatile 语义的增强
    • 管程中锁的规则
    • 这条规则是指对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。
    • 管程是一种通用的同步原语,在Java 中指的就是synchronized,synchronized 是 Java 里对管程的实现。
    • 管程中的锁在 Java 里是隐式实现的

      1. synchronized (this) { // 此处自动加锁
      2. // x 是共享变量, 初始值 =10
      3. if (this.x < 12) {
      4. this.x = 12;
      5. }
      6. } // 此处自动解锁
    • 结合规则 4——管程中锁的规则理解:假设 x 的初始值是 10,线程 A 执行完代码块后 x 的值会变成12(执行完自动释放锁),线程 B 进入代码块时,能够看到线程 A 对 x 的写操作,也就是线程 B 能够看到x==12。

    • 线程 start() 规则
    • 指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。

      1. Thread B = new Thread(() -> {
      2. // 主线程调用 B.start() 之前
      3. // 所有对共享变量的修改,此处皆可见
      4. // 此例中,var==77
      5. });
      6. // 此处对共享变量 var 修改
      7. var =77;
      8. // 主线程启动子线程
      9. B.start();
    • 线程的join()规则

    • 指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。
    • 换句话说就是,如果在线程 A 中,调用线程 B 的 join() 并成功返回,那么线程 B 中的任意操作 Happens-Before 于该 join() 操作的返回。

      1. Thread B = new Thread(()->{
      2. // 此处对共享变量 var 修改
      3. var = 66;
      4. });
      5. // 例如此处对共享变量修改,
      6. // 则这个修改结果对线程 B 可见
      7. // 主线程启动子线程
      8. B.start();
      9. B.join();
      10. // 子线程所有对共享变量的修改
      11. // 在主线程调用 B.join() 之后皆可见
      12. // 此例中,var==66
    • 线程中断规则:

    • 对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。
    • 对象终结规则:
    • 一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。
2.4 final
  1. 概念

    • final 修饰变量时,初衷是告诉编译器:这个变量生而不变,可以可劲儿优化。
  2. 逸出

    • 1.5 以后 Java 内存模型对 final 类型变量的重排进行了约束。现在只要我们提供正确构造函数没有“逸出”,就不会出问题了。
    • 如下,在构造函数里面将 this 赋值给
      了全局变量global.obj,这就是“逸出”,线程通过 global.obj 读取 x 是有可能读到 0的,一定要避免“逸出”。(有可能通过global.obj 可能访问到还没有初始化的this对象;将this赋值给global.obj时,this还没有初始化完)
      1. final int x;
      2. // 错误的构造函数
      3. public FinalFieldExample() {
      4. x = 3;
      5. y = 4;
      6. // 此处就是讲 this 逸出,
      7. global.obj = this;
      8. }

3. 互斥锁:解决原子性问题

3.1 原子性问题
  1. 原子性概念?

    • 一个或者多个操作在 CPU 执行的过程中不被中断的特性,称为原子性。
  2. 32位CPU上执行long型变量的写操作?

    • long型变量是64位,在32位CPU上执行写操作会被拆分成两次写操作(写高32位和写低32位)。
    • 单核
      CPU同一时刻只有一个线程执行,禁止CPU中断,意味着操作系统不会重新调度线程,也就是禁止了线程切换,获得CPU使用权的线程就可以不间断地执行,所以两次写操作一定是:要么都被执行,要么都没有被执行,具有原子性。
    • 多核
      同一时刻,有可能有两个线程同时在执行,一个线程执行在CPU-1上,一个线程执行在CPU-2上,此时禁止CPU中断,只能保证CPU上的线程连续执行,并不能保证同一时刻只有一个线程执行,这两个线程同时写long型变量高32位的话,就会出现非原子性问题
    • 解决条件:
      “同一时刻只有一个线程执行”这个条件非常重要,形成互斥条件即可。
3.2 锁模型
  1. 锁模型
    • 一段需要互斥执行的代码称为临界区
    • 基本:
      线程在进入临界区之前,首先尝试加锁lock(),如果成功,则进入临界区,此时我们称这个线程持有锁;否则呢就等待,直到持有锁的线程解锁;持有锁的线程执行完临界区的代码后,执行解锁 unlock()。
    • 改进
      首先,我们要把临界区要保护的资源标注出来,如图中临界区里增加了一个元素:受保护的资源R;其次,我们要保护资源R就得为它创建一把锁LR;最后,针对这把锁LR,我们还需在进出临界区时添上加锁操作和解锁操作。另外,在锁LR和受保护资源之间,我特地用一条线做了关联,这个关联关系非常重要。
      改进的锁模型
3-synchronized
  1. synchronized 是Java在语言层面提供的互斥原语.
  2. synchronized的使用?

    • 与上述模型比较,lock()和解锁unlock()两个操作是被Java默默加上的,Java编译器会在synchronized修饰的方法或代码块前后自动加上加锁lock()和解锁unlock()
    • 代码
      1. class X {
      2. //1. 修饰非静态方法
      3. synchronized void foo() {
      4. // 临界区
      5. }
      6. //2. 修饰静态方法
      7. synchronized static void bar() {
      8. // 临界区
      9. }
      10. //3. 修饰代码块
      11. Object obj = new Object();
      12. void baz() {
      13. synchronized (obj) {
      14. // 临界区
      15. }
      16. }
      17. }
  3. 修饰方法的时候锁定的是什么呢?

    • 显式:
      修饰代码块,锁定了一个obj对象;
    • 隐式:
      -- 修饰非静态方法,锁定的是当前实例对象 this。
      -- 修饰静态方法,锁定的是当前类的Class对象,例子中为 Class X;
      1. Class X {
      2. //1. 修饰非静态方法
      3. synchronized(this) void foo() {
      4. // 临界区
      5. }
      6. //2. 修饰静态方法
      7. synchronized(X.class) static void bar() {
      8. // 临界区
      9. }
      10. }
  4. 用 synchronized 解决 count+=1 问题?

    • 代码

      1. class SafeCalc {
      2. long value = 0L;
      3. long get() {
      4. return value;
      5. }
      6. synchronized void addOne() {
      7. value += 1;
      8. }
      9. }
    • 原子性:
      addOne()方法,被synchronized修饰后,无论是单核CPU还是多核CPU,只有一个线程能够执行addOne()方法,所以一定能保证原子操作。

    • 可见性-addOne():
      -- 管程中锁的规则:对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。
      -- 即前一个线程的解锁操作对后一个线程的加锁操作可见
      -- 综合Happens-Before的传递性原则,得出前一个线程在临界区修改的共享变量(该操作在解锁之前),对后续进入临界区(该操作在加锁之后)的线程是可见的。
    • 可见性-get():
      -- 管程中锁的规则,是只保证后续对这个锁的加锁的可见性,而get()方法并没有加锁操作,可见性没法保证。
      -- 加上syc关键字

      1. synchronized long get() {
      2. return value;
      3. }
    • get()方法和addOne()方法都需要访问value这个受保护的资源,这个资源用this这把锁来保护。线程要进入临界区get()和addOne(),必须先获得this这把锁,这样get()和addOne()也是互斥的。

  5. 锁和受保护资源的关系?

    • 受保护资源和锁之间的关联关系是N:1的关系。
    • 加上static

      1. class SafeCalc {
      2. static long value = 0L; // +static
      3. long get() {
      4. return value;
      5. }
      6. synchronized static void addOne() { // +static
      7. value += 1;
      8. }
      9. }
    • 两个锁保护一个资源。这个受保护的资源就是静态变量 value,两个锁分别是 this 和 SafeCalc.class。我们可以用下面这幅图来形象描述这个关系。由于临界区 get() 和 addOne() 是用两个锁保护的,因此这两个临界区没有互斥关系,临界区 addOne() 对 value 的修改对临界区 get() 也没有可见性保证,这就导致并发问题。

  6. 两种加锁对比?
    在这里插入图片描述
    在这里插入图片描述

一把锁保护多个资源
保护没有关联关系的多个资源
  1. 问题类型?

    • 保护没有关联关系的多个资源
      如,银行业务中有针对账户余额(余额是一种资源)的取款操作,也有针对账户密码(密码也是一种资源)的更改操作,可以为账户余额和账户密码分配不同的锁来解决并发问题。
    • 一把互斥锁来保护多个资源
      如,用this这一把锁来管理账户类里所有的资源:账户余额和用户密码。
      -- 实现,示例所有的方法都增加同步关键字synchronized。
      1. class Account {
      2. // 锁:保护账户余额
      3. private final Object balLock = new Object();
      4. // 账户余额
      5. private Integer balance;
      6. // 锁:保护账户密码
      7. private final Object pwLock = new Object();
      8. // 账户密码
      9. private String password;
      10. // 取款
      11. void withdraw(Integer amt) {
      12. synchronized (balLock) {
      13. if (this.balance > amt) {
      14. this.balance -= amt;
      15. }
      16. }
      17. }
      18. // 查看余额
      19. Integer getBalance() {
      20. synchronized (balLock) {
      21. return balance;
      22. }
      23. }
      24. // 更改密码
      25. void updatePassword(String pw) {
      26. synchronized (pwLock) {
      27. this.password = pw;
      28. }
      29. }
      30. // 查看密码
      31. String getPassword() {
      32. synchronized (pwLock) {
      33. return password;
      34. }
      35. }
      36. }
  2. 一把互斥锁来保护多个资源的问题?

    • 问题:
      性能太差,会导致取款、查看余额、修改密码、查看密码这四个操作都是串行的。而我们用两把锁,取款和修改密码是可以并行的。
    • 解决:
      用不同的锁对受保护资源进行精细化管理,能够提升性能。这种锁还有个名字,叫细粒度锁。
保护有关联关系的多个资源
  1. 如何保护有关联关系的多个资源?

    • 问题:
      例如,银行业务里面的转账操作,账户A减少100元,账户B增加100元。怎么保证转账操作transfer()没有并发问题呢?
    • 解决:
      -- 同一把锁来保护多个资源,使锁能覆盖所有受保护资源。
      -- 用 Account.class 作为共享的锁。Account.class 是所有 Account 对象共享的,而且这个对象是Java虚拟机在加载Account类的时候创建的,所以我们不用担心它的唯一性。使用Account.class 作为共享的锁,就无需在创建 Account对象时传入,代码也简化了。
      1. class Account {
      2. private int balance;
      3. // 转账
      4. void transfer(Account target, int amt) {
      5. synchronized (Account.class) {
      6. if (this.balance > amt) {
      7. this.balance -= amt;
      8. target.balance += amt;
      9. }
      10. }
      11. }
      12. }

4. 死锁

1-死锁的产生与预防
  1. 死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。

  2. 死锁的原因?

    • 细粒度锁
      使用细粒度锁可以提高并行度,是性能优化的一个重要手段。但使用细粒度锁可能会导致死锁
    • 现象
      客户找柜员张三做个转账业务:账户 A 转账户 B 100元,
      另一个客户找柜员李四也做个转账业务:账户 B 转账户 A 100元,
      于是张三和李四同时都去文件架上拿账本,可能出现张三拿到了账本 A,李四拿到了账本 B。张三拿到账本 A 后就等着账本 B(账本 B已经被李四拿走),而李四拿到账本 B 后就等着账本 A(账本 A 已经被张三拿走),他们要等多久呢?他们会永远等待下去…
  3. 如何预防死锁?

    • 死锁的产生
      1)互斥,共享资源 X 和 Y 只能被一个线程占用;
      2)占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
      3)不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
      4)循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等
      待。
    • 破坏死锁条件
      1)对于“占用且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。
      2)对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
      3)对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。
2-“等待-通知”机制优化循环等待
  1. 循环等待问题?

    • 问题:
      破坏占用且等待条件的时,如果转出账本和转入账本不满足同时在文件架上这个条件,就用死循环的方式来循环等待,
    • 核心代码:

      1. // 一次性申请转出账户和转入账户,直到成功
      2. while(!actr.apply(this, target))
    • 如果apply()操作耗时非常短,而且并发冲突量也不大时,循环上几次或者几十次就能一次性获取转出账户和转入账户了。但如果apply()操作耗时长,或者并发冲突量大的时候,可能要循环上万次才能获取到锁太消耗CPU,循环等待方案就不适用了。

  2. 循环等待问题的解决方案?

    • 方案:如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文
      件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗CPU的问题。

Java语言是如何支持等待-通知机制

  1. 类比于现实的等待 - 通知机制的就医流程?
    1)患者先去挂号,然后到就诊门口分诊,等待叫号;
    2)当叫到自己的号时,患者就可以找大夫就诊了;
    3)就诊过程中,大夫可能会让患者去做检查,同时叫下一位患者;
    4)当患者做完检查后,拿检测报告重新分诊,等待叫号;
    5)当大夫再次叫到自己的号时,患者再去找大夫就诊。

    • 等待-通知机制的就医流程,不仅能够保证同一时刻大夫只为一个患者服务,而且还能够保证大夫和患者的效率。
    • 上述等待-通知机制中的一些细节:
      1)患者到就诊门口分诊,类似于线程要去获取互斥锁;当患者被叫到时,类似线程已经获取到锁了。
      2)大夫让患者去做检查(缺乏检测报告不能诊断病因),类似于线程要求的条件没有满足。
      3)患者去做检查,类似于线程进入等待状态;然后大夫叫下一个患者,这个步骤我们在前面的等待-通知机制中忽视了,这个步骤对应到程序里,本质是线程释放持有的互斥锁。
      4)患者做完检查,类似于线程要求的条件已经满足;患者拿检测报告重新分诊,类似于线程需要重新获取互斥锁,这个步骤我们在前面的等待-通知机制中也忽视了。
    • 综合可以得出一个完整的等待-通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程重新获取互斥锁。
  2. 如何用synchronized实现等待-通知机制?

    • Java语言里,等待-通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized配合wait()、notify()、notifyAll()这三个方法就能轻松实现。
  3. 如何用synchronized实现互斥锁?

    • 在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入synchronized保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列
      在这里插入图片描述
    • 在并发程序中,当一个线程进入临界区后,由于某些条件不满足,需要进入等待状态,Java 对象的 wait() 方法就能够满足这种需求。如上图所示,当调用 wait() 方法后,当前线程就会被阻塞,并且进入到右边的等待队列中,这个等待队列也是互斥锁的等待队列。线程在进入等待队列的同时,会释放持有的互斥锁,线程释放锁后,其他线程就有机会获得锁,并进入临界区了。
      在这里插入图片描述
    • 那线程要求的条件满足时,如何通知这个等待的线程呢?
      就是 Java 对象的 notify()和notifyAll()方法。下图大致描述了这个过程,当条件满足时调用notify(),会通知等待队列(互斥锁的等待队列)中的线程,告诉它条件曾经满足过
      在这里插入图片描述
    • 为什么说是曾经满足过呢?
      注意:notify()只能保证在通知时间点,条件是满足的。而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足了(保不齐有其他线程插队)。
    • 还需注意:被通知的线程要想重新执行,仍然需要获取到互斥锁(因为曾经获取的锁在调用 wait() 时已经释放了)。
  4. 使用wait()、notify()、notifyAll()方法?

    • synchronized 锁定的是this,那么对应的一定是this.wait()、this.notify()、this.notifyAll();
    • synchronized 锁定的是target,那么对应的一定是target.wait()、target.notify()、target.notifyAll() 。
    • wait()、notify()、notifyAll()这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在
      synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用target.wait() 调用的话,JVM 会抛出一个运行时异常:
      java.lang.IllegalMonitorStateException。
  5. 一个更好地资源分配器:如何解决一次性申请转出账户和转入账户的问题?

    • 这个等待-通知机制中,需要考虑以下四个要素。
      1)互斥锁:上一部分死锁中的Allocator需要是单例的,所以可以用this作为互斥锁。
      2)线程要求的条件:转出账户和转入账户都没有被分配过。
      3)何时等待:线程要求的条件不满足就等待。
      4)何时通知:当有线程释放账户时就通知。
    • 考虑好上述几个问题,完成下面的代码:
      1. while(条件不满足) {
      2. wait();
      3. }
      4. ```
      5. * 利用这种范式可以解决上面提到的条件曾经满足过这个问题。因为当 wait() 返回时,有可能条件已经发生变化了,曾经条件满足,但是现在已经不满足了,所以要重新检验条件是否满足。范式,意味着是经典做法,所以没有特殊理由不要尝试换个写法。
      6. ``` java
      7. class Allocator {
      8. private List<Object> als;
      9. // 一次性申请所有资源
      10. synchronized void apply(
      11. Object from, Object to){
      12. // 经典写法
      13. while(als.contains(from) ||
      14. als.contains(to)){
      15. try{
      16. wait();
      17. }catch(Exception e){
      18. }
      19. }
      20. als.add(from);
      21. als.add(to);
      22. }
      23. // 归还资源
      24. synchronized void free(
      25. Object from, Object to){
      26. als.remove(from);
      27. als.remove(to);
      28. notifyAll();
      29. }
      30. }
  6. 尽量使用 notifyAll()?

    • 上述使用notifyAll()来实现通知机制,为什么不使用notify()呢?
    • 这二者是有区别的,notify() 是会随机地通知等待队列中的一个线程,而 notifyAll() 会通知等待队列中的所有线程。从感觉上来讲,应该是 notify() 更好一些,因为即便通知所有线程,也只有一个线程能够进入临界区。实际上使用notify()也很有风险,它的风险在于可能导致某些线程永远不会被通知到。
    • 假设我们有资源 A、B、C、D,线程 1 申请到了 AB,线程 2 申请到了 CD,此时线程 3 申请AB,会进入等待队列(AB 分配给线程 1,线程 3 要求的条件不满足),线程 4 申请 CD 也会进入等待队列。我们再假设之后线程 1 归还了资源 AB,如果使用notify()来通知等待队列中的线程,有可能被通知的是线程 4,但线程 4 申请的是 CD,所以此时线程 4 还是会继续等待,而真
      正该唤醒的线程 3 就再也没有机会被唤醒了。
    • 所以除非经过深思熟虑,否则尽量使用 notifyAll()。
  7. 总结

    • 等待-通知机制是一种非常普遍的线程间协作的方式。工作中经常看到有同学使用轮询的方式来等待某个状态,其实很多情况下都可以用今天我们介绍的等待-通知机制来优化。
    • Java语言内置的synchronized配合wait()、notify()、notifyAll()这三个方法可以快速实现这种机制,但是它们的使用看上去还是有点复杂,所以你需要认真理解等待队列和 wait()、notify()、notifyAll()的关系。

3-Java线程

1-生命周期

通用 Java状态 状态转换 interrupt

1. 通用的线程生命周期?
通用的线程生命周期基本上可以用下图这个“五态模型”来描述。这五态分别是:初始状态、可运行状态、运行状态、休眠状态和终止状态。
在这里插入图片描述

  1. 这“五态模型”的详细情况如下所示?
    1)初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
    2)可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
    3)当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态。
    4)运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
    5)线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

  2. 五种状态的定义?

    • 参考:
      https://my.oschina.net/payzheng/blog/692635
      评论存疑:https://blog.csdn.net/woshiyigeliangliang/article/details/81116872
    • 定义:
      -- NEW:一个被创建的线程,但是还没有调用start方法
      -- RUNNABLE:一个正在被执行的线程的状态
      -- BLOCKED:一个线程因为等待临界区的锁被阻塞产生的状态
      -- WAITING:一个线程进入了锁,但是需要等待其他线程执行某些操作。时间不确定
      -- TIMED_WAITING:一个线程进入了锁,但是需要等待其他线程执行某些操作。时间确定
      -- TERMINATED:通过sleep或wait timeout方法进入的限期等待的状态)
  3. 这五种状态在不同编程语言里会有简化合并?

    • 简化合并,如:
      -- C语言的POSIX Threads规范,就把初始状态和可运行状态合并了;
      -- Java语言里则把可运行状态和运行状态合并了,这两个状态在操作系统调度层面有用,而JVM层面不关心这两个状态,因为JVM把线程调度交给操作系统处理了。
    • 细化,如,
      -- Java语言里就细化了休眠状态。
  4. Java中线程的生命周期?

    • Java语言中线程共有六种状态,分别是:
      1)NEW(初始化状态)
      2)RUNNABLE(可运行 / 运行状态)
      3)BLOCKED(阻塞状态)
      4)WAITING(无时限等待)
      5)TIMED_WAITING(有时限等待)
      6)TERMINATED(终止状态)
    • 在操作系统层面,Java线程中的BLOCKED、WAITING、TIMED_WAITING是一种状态,即前面提到的休眠状态。即Java线程处于这三种状态之一,那么这个线程就永远没有CPU的使用权。
    • 所以Java线程的生命周期可以简化为下图:
      在这里插入图片描述
    • 其中,BLOCKED、WAITING、TIMED_WAITING可以理解为线程导致休眠状态的三种原因。

具体是哪些情形会导致线程从RUNNABLE状态转换到这三种状态呢?而这三种状态又是何时转换回RUNNABLE的呢?以及 NEW、TERMINATED 和 RUNNABLE 状态是如何转换的?

  1. RUNNABLE与BLOCKED的状态转换?

    • RUNNABLE转BLOCKED
      只有一种场景会触发这种转换,就是线程等待synchronized的隐式锁。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,这种情况下,等待的线程就会从RUNNABLE转换到BLOCKED状态。
    • BLOCKED转RUNNABLE
      当等待的线程获得synchronized隐式锁时,就又会从BLOCKED转换到RUNNABLE状态。
    • 在操作系统线程的生命周期,线程调用阻塞式API时,是否会转换到BLOCKED状态?
      在操作系统层面,线程是会转换到休眠状态的,但是在JVM层面,Java线程的状态不会发生变化,也就是说Java线程的状态会依然保持RUNNABLE状态。JVM层面并不关心操作系统调度相关的状态,因为在JVM看来,等待CPU使用权(操作系统层面此时处于可执行状态)与等待I/O(操作系统层面此时处于休眠状态)没有区别,都是在等待某个资源,所以都归入了 RUNNABLE 状态。而我们平时所谓的Java在调用阻塞式API时,线程会阻塞,指的是操作系统线程的状态,并不是Java线程的状态。
  2. RUNNABLE与WAITING的状态转换?

    • 总体,三种场景会触发这种转换:
      ① 获得synchronized隐式锁的线程,调用无参数的 Object.wait() 方法。
      ② 调用无参数的Thread.join()方法。其中的join()是一种线程同步方法,例如有一个线程对象threadA,当调用A.join()的时候,执行这条语句的线程会等待threadA执行完,而等待中的这个线程,其状态会从 RUNNABLE 转换到 WAITING。当线程 thread A 执行完,原来等待它的线程又会从 WAITING 状态转换到 RUNNABLE。
      ③ 调用 LockSupport.park() 方法。其中的 LockSupport 对象,其
      实 Java 并发包中的锁,都是基于它实现的(ReentrentLock.lock()底层调用的是LockSupport.park(),因此ReentrentLock.lock()进入的是WAITING状态。)。调用 LockSupport.park()方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread)可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。
  3. RUNNABLE与TIMED_WAITING的状态转换?
    有五种场景会触发这种转换:
    1)调用带超时参数的 Thread.sleep(long millis) 方法;
    2)获得synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
    3)调用带超时参数的 Thread.join(long millis) 方法;
    4)调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
    5)调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。

    • TIMED_WAITING 和 WAITING状态的区别,仅仅是触发条件多了超时参数。
  4. NEW到RUNNABLE状态?

    • Java刚创建出来的Thread对象就是NEW状态。
    • NEW状态的线程,不会被操作系统调度,因此不会执行。Java线程要执行,就必须转换到RUNNABLE 状态。
    • 从NEW状态转换到RUNNABLE状态,只要调用线程对象的start()方法就可以
    • 示例代码如下:
      1. MyThread myThread = new MyThread();
      2. // 从 NEW 状态转换到 RUNNABLE 状态
      3. myThread.start();
  5. 从RUNNABLE到TERMINATED状态?

    • 线程执行完run()方法后,会自动转换到TERMINATED状态,当然如果执行run()方法的时候异常抛出,也会导致线程终止。
    • 有时候我们需要强制中断run()方法的执行,例如run()方法访问一个很慢的网络,我们等不下去了,想终止怎么办呢?
      Java 的 Thread 类里面倒是有个stop()方法,不过已经标记为@Deprecated,所以不建议使用了。正确的姿势其实是调用 interrupt()方法。
  6. stop() 和 interrupt() 方法的主要区别是什么呢

    • stop() 方法会真的杀死线程,不给线程喘息的机会,如果线程持有 synchronized 隐式锁,也不会释放,那其他线程就再也没机会获得synchronized隐式锁,这实在是太危险了。所以该方法就不建议使用了,类似的方法还有 suspend() 和 resume() 方法,这两个方法同样也都不建议使用了。
    • interrupt() 方法就温柔多了,interrupt()方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。
  7. 被interrupt的线程,是怎么收到通知的呢?

    • 一种是异常,另一种是主动检测。
    • 主动检测一:
      当线程A处于WAITING、TIMED_WAITING状态时,如果其他线程调用线程A的interrupt()方法,会使线程A返回到RUNNABLE状态,同时线程A的代码会触发InterruptedException异常。
      上面我们提到转换到WAITING、TIMED_WAITING状态的触发条件,都是调用了类似wait()、join()、sleep()这样的方法,我们看这些方法的签名,发现都会throws InterruptedException这个异常。这个异常的触发条件就是:其他线程调用了该线程的
      interrupt() 方法。
    • 主动检测二:
      当线程 A 处于 RUNNABLE状态时,并且阻塞在java.nio.channels.InterruptibleChannel上时,如果其他线程调用线程A的interrupt()方法,线程A会触发java.nio.channels.ClosedByInterruptException这个异常;而阻塞在java.nio.channels.Selector上时,如果其他线程调用线程A的interrupt()方法,线程A的java.nio.channels.Selector 会立即返回。
    • 主动检测
      如果线程处于RUNNABLE状态,并且没有阻塞在某个I/O操作上,例如中断计算圆周率的线程 A,这时就得依赖线程 A 主动检测中断状态了。如果其他线程调用线程 A 的 interrupt() 方法,那么线程A可以通过isInterrupted()方法,检测是不是自己被中断了。
  8. 总结

    • 多线程程序很难调试,出了 Bug基本上都是靠日志,靠线程dump来跟踪问题,分析线程 dump 的一个基本功就是分析线程状态,大部分的死锁、饥饿、活锁问题都需要跟踪分析线程的状态。
    • 可以通过 jstack 命令或者Java VisualVM这个可视化工具将JVM所有的线程栈信息导出来,完整的线程栈信息不仅包括线程的当前状态、调用栈,还包括了锁的信息。
  9. 思考题

    • 下面代码的本意是当前线程被中断之后,退出while(true),你觉得这段代码是否正确呢?

      1. Thread th = Thread.currentThread();
      2. while(true) {
      3. if(th.isInterrupted()) {
      4. break;
      5. }
      6. // 省略业务代码无数
      7. try {
      8. Thread.sleep(100);
      9. }catch (InterruptedException e){
      10. e.printStackTrace();
      11. }
      12. }
    • 目的:注意InterruptedException 的处理方式。

    • 当你调用 Java 对象的 wait() 方法或者线程的 sleep()方法时,需要捕获并处理 InterruptedException 异常,如下所示,本意是通过 isInterrupted()检查线程是否被中断了,如果中断了就退出 while 循环。当其他线程通过调用th.interrupt().来中断 th 线程时,会设置th线程的中断标志位,从而使th.isInterrupted()返回 true,这样就能退出 while 循环了。
    • 问题:
      实际上却是几乎起不了作用。原因是这段代码在执行的时候,大部分时间都是阻塞在 sleep(100) 上,当其他线程通过调用th.interrupt().来中断 th 线程时,大概率地会触发 InterruptedException 异常,在触发 InterruptedException 异常的同时,JVM 会同时把线程的中断标志位清除,所以这个时候th.isInterrupted()返回的是 false。
    • 解决:
      正确的处理方式应该是捕获异常之后重新设置中断标志位
      1. try {
      2. Thread.sleep(100);
      3. }catch(InterruptedException e){
      4. // 重新设置中断标志位
      5. th.interrupt();
      6. }

2-创建合适线程数

1. 各种线程池的线程数量调整成多少是合适的?/Tomcat的线程数、Jdbc连接池的连接数是多少?等等。
2. 如何设置合适的线程数呢?
2. 分析多线程的应用场景有哪些?

  1. 如何度量性能?

    • 度量性能的指标有很多,两个最核心的指标是延迟和吞吐量。
    • 延迟指的是发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。
    • 吞吐量指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。
    • 两个指标内部有一定的联系(同等条件下,延迟越短,吞吐量越大),但是由于它们隶属不同的维度(一个是时间维度,一个是空间维度),并不能互相转换。
  2. 分析为什么要使用多线程?

    • 使用多线程,本质上就是提升程序性能。
    • 所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。这也是使用多线程的主要目的。
  3. 怎么降低延迟,提高吞吐量呢?

    • 要想“降低延迟,提高吞吐量”,基本上有两个方向,一个方向是优化算法,另一个方向是将硬件的性能发挥到极致。前者属于算法范畴,后者则是和并发编程息息相关了。
    • 那计算机主要有哪些硬件呢?主要是两类:一个是I/O,一个是CPU。
    • 简言之,在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。
    • 操作系统不是已经解决了硬件的利用率问题了吗?
      -- 但是操作系统解决硬件利用率问题的对象往往是单一的硬件设备。例如操作系统已经解决了磁盘和网卡的利用率问题,利用中断机制还能避免 CPU 轮询 I/O 状态,也提升了CPU的利用率。
      -- 而并发程序,往往需要CPU和I/O设备相互配合工作,也就是说,需要解决CPU和I/O设备综合利用率的问题。关于这个综合利用率的问题,操作系统虽然没有办法完美解决,但是却给我们提供了方案,那就是:多线程。
  4. 多线程的应用场景?
    示例说明:如何利用多线程来提升 CPU 和 I/O 设备的利用率?

    • 假设程序按照 CPU 计算和 I/O 操作交叉执行的方式运行,而且 CPU 计算和 I/O 操作的耗时是 1:1。
      如下,如果只有一个线程,执行 CPU 计算的时候,I/O 设备空闲;执行 I/O 操作的时候,CPU 空闲,所以 CPU 的利用率和 I/O 设备的利用率都是 50%。
      在这里插入图片描述
    • 如果有两个线程,如下图所示,当线程 A 执行 CPU 计算的时候,线程 B 执行 I/O 操作;当线程A 执行 I/O 操作的时候,线程 B 执行 CPU 计算,这样 CPU 的利用率和 I/O 设备的利用率就都达到了 100%。
      在这里插入图片描述
    • 将 CPU 的利用率和I/O设备的利用率都提升到了100%,会对性能产生了哪些影响呢?
      通上图:单位时间处理的请求数量翻了一番,也就是说吞吐量提高了 1 倍。
      此时可以逆向思维一下,如果CPU和I/O设备的利用率都很低,那么可以尝试通过增加线程来提高吞吐量
  5. 在单核时代,多线程主要就是用来平衡CPU和I/O设备的。如果程序只有CPU计算,而没有I/O 操作的话,多线程不但不会提升性能,还会使性能变得更差,原因是增加了线程切换的成本。但是在多核时代,这种纯计算型的程序也可以利用多线程来提升性能。为什么呢?

    • 因为利用多核可以降低响应时间。
    • 例子说明一下:计算 1+2+… … +100 亿的值,如果在 4 核的
      CPU 上利用 4 个线程执行,线程 A 计算 [1,25 亿),线程 B 计算 [25 亿,50 亿),线程 C 计算[50,75 亿),线程 D 计算 [75 亿,100 亿],之后汇总,那么理论上应该比一个线程计算 [1,100 亿] 快将近 4倍,响应时间能够降到 25%。一个线程,对于 4 核的 CPU,CPU 的利用率只
      有 25%,而 4 个线程,则能够将 CPU 的利用率提高到 100%。
      在这里插入图片描述
  6. I/O 密集型计算、CPU 密集型计算?

    • I/O 密集型计算:
      程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于CPU计算来说都非常长,这种场景一般都称为 I/O 密集型计算;
    • CPU 密集型计算
      和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。
  7. 创建多少线程合适?

    • 创建多少线程合适,要看多线程具体的应用场景。
    • I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。
    • CPU 密集型计算:
      -- 多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的CPU,每个核一个线程,理论上创建4个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量=CPU核数”就是最合适的。
      -- 工程上,线程的数量一般会设置为“CPU核数+1”,这样,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
    • I/O 密集型的计算场:
      -- 如前面我们的例子中,如果 CPU 计算和 I/O 操作的耗时是1:1,那么 2个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那多少个线程合适
      呢?是 3 个线程,如下图所示:CPU 在 A、B、C 三个线程之间切换,对于线程 A,当 CPU 从B、C 切换回来时,线程 A 正好执行完 I/O 操作。这样 CPU 和 I/O设备的利用率都达到了100%。
      在这里插入图片描述
      -- 对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:
      【最佳线程数 =1 +(I/O 耗时 / CPU 耗时)】 -- 单核
      我们令 R=I/O 耗时 / CPU 耗时,综合上图,可以这样理解:当线程 A 执行 IO 操作时,另外 R个线程正好执行完各自的 CPU 计算。这样 CPU 的利用率就达到了 100%。
      多核 CPU,等比扩大,计算公式如下:
      【最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]】 -- 多核
      -- I/O 耗时和 CPU 耗时的比值是一个关键参数,这个参数是未知的,而且是动态变化的,所以工程上,要估算这个参数,然后做各种不同场景下的压测来验证估计。不过工程上,原则还是将硬件的性能发挥到极致,所以压测时,需要重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系。
  8. 讨论题:
    有些同学对于最佳线程数的设置积累了一些经验值,认为对于 I/O密集型应用,最佳线程数应该为:2 * CPU 的核数 + 1,你觉得这个经验值合理吗?

    • 理论上,这个经验值一定是靠不住的。
    • 但是经验值对于很多“I/O 耗时 / CPU 耗时”不太容易确定的系统来说,却是一个很好到初始值。
    • 最佳线程数最终还是靠压测来确定的,实际工作中大家面临的系统,“I/O 耗时 /
      CPU 耗时”往往都大于 1,所以基本上都是在这个初始值的基础上增加。增加的过程中,应关注线程数是如何影响吞吐量和延迟的。一般来讲,随着线程数的增加,吞吐量会增加,延迟也会缓慢增加;但是当线程数增加到一定程度,吞吐量就会开始下降,延迟会迅速增加。这个时候基本上就是线程能够设置的最大值了。
    • 实际工作中,不同的 I/O 模型对最佳线程数的影响非常大,例如大名鼎鼎的 Nginx 用的是非阻塞 I/O,采用的是多进程单线程结构,Nginx 本来是一个 I/O 密集型系统,但是最佳进程数设置的却是 CPU 的核数,完全参考的是 CPU密集型的算法。

3-局部变量是线程安全

11|Java线程(下):为什么局部变量是线程安全的?

  1. Java 语言里,是不是所有变量都是共享变量呢?

    • 不是,局部变量
  2. Java 方法里面的局部变量是否存在并发问题呢?
    下面我们就先结合一个例子剖析下这个问题。
    比如,下面代码里的 fibonacci() 这个方法,会根据传入的参数 n ,返回 1 到 n 的斐波那契数
    列,斐波那契数列类似这样: 1、1、2、3、5、8、13、21、34……第 1 项和第 2 项是 1,从第
    3 项开始,每一项都等于前两项之和。在这个方法里面,有个局部变量:数组 r 用来保存数列的
    结果,每次计算完一项,都会更新数组 r 对应位置中的值。你可以思考这样一个问题,当多个线
    程调用 fibonacci() 这个方法的时候,数组 r 是否存在数据竞争(Data Race)呢?

2. 并发工具类

  1. 两个同步原语:管程和信号量
    • 两个同步原语中任何一个都可以解决所有的并发问题。

1-管程:Lock&Condition

1. lock
  1. Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

  2. Java 语言本身提供的 synchronized 也是管程的一种实现,既然 Java 从语言层面已经实现了管程了,那为什么还要在SDK里提供另外一种实现呢?

    • 发展:
      例如在Java的1.5版本中,synchronized 性能不如SDK 里面的 Lock,但 1.6 版本之后,synchronized做了很多优化,将性能追了上来,所以1.6之后的版本又有人推荐使用synchronized了。
    • 为解决的问题:
      死锁问题中,破坏不可抢占条件方案,但是这个方案synchronized没有办法解决。原因是synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,也释放不了线程已经占有的资源。但我们希望的是:
      1. 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申
      2. 请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
  3. 如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?

    • 应该有三种方案。
      1)能够响应中断。
      synchronized 的问题是,持有锁 A 后,如果尝试获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
      2)支持超时。
      如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
      3)非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
    • 这三种方案可以全面弥补synchronized的问题。这三个方案体现在API上,就是 Lock 接口的三个方法。如下:
      1. // 支持中断的 API
      2. void lockInterruptibly()
      3. throws InterruptedException;
      4. // 支持超时的 API
      5. boolean tryLock(long time, TimeUnit unit)
      6. throws InterruptedException;
      7. // 支持非阻塞获取锁的 API
      8. boolean tryLock();
  4. 如何保证可见性?

    • Java SDK 里面 Lock 的使用,有一个经典的范例,就是try{}finally{}
      ,需要重点关注的是在finally里面释放锁。
    • 可见性是怎么保证的?
      -- Java 里多线程的可见性是通过 Happens-Before 规则保证的,
      -- synchronized 之所以能够保证可见性,也是因为有一条 synchronized相关的规则:synchronized 的解锁 Happens-Before于后续对这个锁的加锁。
      -- Java SDK 里面 Lock 靠什么保证可见性呢?例如在下面的代码中,线程 T1 对 value 进行了 +=1 操作,那后续的线程 T2 能够看到 value的正确结果吗?
      1. class X {
      2. private final Lock rtl = new ReentrantLock();
      3. int value;
      4. public void addOne() {
      5. // 获取锁
      6. rtl.lock();
      7. try {
      8. value+=1;
      9. } finally {
      10. // 保证锁能释放
      11. rtl.unlock();
      12. }
      13. }
      14. }

    答案必须是肯定的。Java SDK里面锁原理简述:利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state的值(简化后的代码如下面所示)。也就是说,在执行 value+=1
    之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile变量 state。根据相关的 Happens-Before 规则:
    1)顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
    2)volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作Happens-Before 线程 T2 的 lock() 操作;
    3)传递性规则:线程 T2 的 lock() 操作 Happens-Before 线程 T1 的 value+=1 。

    1. class SampleLock {
    2. volatile int state;
    3. // 加锁
    4. lock() {
    5. // 省略代码无数
    6. state = 1;
    7. }
    8. // 解锁
    9. unlock() {
    10. // 省略代码无数
    11. state = 0;
    12. }
    13. }

    所以说,后续线程 T2 能够看到 value 的正确结果

  5. 什么是可重入锁?

    • ReentrantLock,这个翻译叫可重入锁。所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。
      例如下面代码中,当线程 T1 执行到 ①处时,已经获取到了锁 rtl ,当在 ① 处调用get() 方法时,会在 ② 再次对锁 rtl执行加锁操作。
      此时,如果锁 rtl 是可重入的,那么线程T1可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。
      1. class X {
      2. private final Lock rtl = new ReentrantLock();
      3. int value;
      4. public int get() {
      5. // 获取锁
      6. rtl.lock();
      7. try {
      8. return value;
      9. } finally {
      10. // 保证锁能释放
      11. rtl.unlock();
      12. }
      13. }
      14. public void addOne() {
      15. // 获取锁
      16. rtl.lock();
      17. try {
      18. value = 1 + get();
      19. } finally {
      20. // 保证锁能释放
      21. rtl.unlock();
      22. }
      23. }
      24. }
  6. 可重入函数?

    • 可重入函数怎么理解呢?指的是线程可以重复调用?
      显然不是,所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的。
  7. 公平锁与非公平锁?

    • ReentrantLock这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair参数代表的是锁的公平策略,如果传入 true就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。
  1. // 无参构造函数:默认非公平锁
  2. public ReentrantLock() {
  3. sync = new NonfairSync();
  4. }
  5. // 根据公平策略参数创建锁
  6. public ReentrantLock(boolean fair){
  7. sync = fair ? new FairSync() : new NonfairSync();
  8. }
  1. class Account {
  2. private int balance;
  3. private final Lock lock = new ReentrantLock();
  4. // 转账
  5. void transfer(Account tar, int amt){
  6. while (true) {
  7. if(this.lock.tryLock()) {
  8. try {
  9. if (tar.lock.tryLock()) {
  10. try {
  11. this.balance -= amt;
  12. tar.balance += amt;
  13. } finally {
  14. tar.lock.unlock();
  15. }
  16. }//if
  17. } finally {
  18. this.lock.unlock();
  19. }
  20. }//if
  21. }//while
  22. }//transfer
  23. }
2 Condition
  1. Java SDK 并发包里的Lock有别于synchronized隐式锁的三个特性:能够响应中断、支持超时和非阻塞地获取锁。

  2. Java SDK 并发包里的 Condition

    • Condition实现了管程模型里面的条件变量。
    • 管程中提到 Java 语言内置的管程里只有一个条件变量,而 Lock&Condition实现的管程是支持多个条件变量的,这是二者的一个重要区别。
    • 很多并发场景下,支持多个条件变量能够让并发程序可读性更好,实现起来也更容易。
    • 例如,实现一个阻塞队列,就需要两个条件变量。
  3. 如何利用两个条件变量快速实现阻塞队列呢?
    一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。相
    关的代码,重新列出。

    1. public class BlockedQueue<T>{
    2. final Lock lock = new ReentrantLock();
    3. // 条件变量:队列不满
    4. final Condition notFull = lock.newCondition();
    5. // 条件变量:队列不空
    6. final Condition notEmpty = lock.newCondition();
    7. // 入队
    8. void enq(T x) {
    9. lock.lock();
    10. try {
    11. while (队列已满){
    12. // 等待队列不满
    13. notFull.await();
    14. }
    15. // 省略入队操作...
    16. // 入队后, 通知可出队
    17. notEmpty.signal();
    18. }finally {
    19. lock.unlock();
    20. }
    21. }
    22. // 出队
    23. void deq(){
    24. lock.lock();
    25. try {
    26. while (队列已空){
    27. // 等待队列不空
    28. notEmpty.await();
    29. }
    30. // 省略出队操作...
    31. // 出队后,通知可入队
    32. notFull.signal();
    33. }finally {
    34. lock.unlock();
    35. }
    36. }
    37. }
    • 注意:
      -- Lock 和 Condition实现的管程,线程等待和通知需要调用await()、signal()、signalAll(),语义和wait()、notify()、notifyAll()是相同的。
      -- 区别是,Lock&Condition实现的管程里只能使用前面的await()、signal()、signalAll(),而后面的wait()、notify()、notifyAll() 只有在 synchronized实现的管程里才能使用。
      -- 如果一不小心在Lock&Condition实现的管程里调用了wait()、notify()、notifyAll(),那程序可就彻底玩儿完了
  4. 同步和异步?

    • 那同步和异步的区别到底是什么呢?
      通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
    • 比如在下面的代码里,有一个计算圆周率小数点后 100万位的方法pai1M(),这个方法可能需要执行俩礼拜,如果调用pai1M()之后,线程一直等着计算结果,等俩礼拜之后结果返回,就可以执行 printf("hello world")了,这个属于同步;如果调用pai1M()之后,线程不用等待计算结果,立刻就可以执行 printf("hello world"),这个就属于异步。
      1. // 计算圆周率小说点后 100 万位
      2. String pai1M() {
      3. // 省略代码无数
      4. }
      5. pai1M()
      6. printf("hello world")
  5. 异步实现的两种方式?

    • 同步,是 Java代码默认的处理方式。异步实现的两种方式:
      1)调用方创建一个子线程,在子线程中执行方法调用,这种调用称为异步调用;
      2)方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法一般称为异步方法。
  6. 在项目Dubbo中,Lock和Condition是怎么用的?

    • Dubbo 源码分析
    • 编程领域的异步的场景,比如TCP协议本身就是异步的,经常用到的 RPC 调用,在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的。
    • RPC调用大多数都是同步的啊?内部实现了异步转同步。
    • RPC框架Dubbo内部做了异步转同步,如何实现?分析一下相关源码。
      对于下面一个简单的RPC调用,默认情况下sayHello()方法,是个同步方法,也就是说,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。

      1. DemoService service = 初始化部分省略
      2. String message =
      3. service.sayHello("dubbo");
      4. System.out.println(message);
    • 如果此时你将调用线程dump出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明Dubbo帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在DefaultFuture.get()方法上,所以可以推断:Dubbo 异步转同步的功能应该是通过 DefaultFuture这个类实现的。
      在这里插入图片描述

  7. 不过为了理清前后关系,还是有必要分析一下调用DefaultFuture.get() 之前发生了什么。DubboInvoker 的108行调用了DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了 request(inv, timeout)方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。

    1. public class DubboInvoker{
    2. Result doInvoke(Invocation inv){
    3. // 下面这行就是源码中 108 行
    4. // 为了便于展示,做了修改
    5. return currentClient
    6. .request(inv, timeout)
    7. .get();
    8. }
    9. }
    • DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待-通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。

      1. // 创建锁与条件变量
      2. private final Lock lock
      3. = new ReentrantLock();
      4. private final Condition done
      5. = lock.newCondition();
      6. // 调用方通过该方法等待结果
      7. Object get(int timeout){
      8. long start = System.nanoTime();
      9. lock.lock();
      10. try {
      11. while (!isDone()) {
      12. done.await(timeout);
      13. long cur=System.nanoTime();
      14. if (isDone() ||
      15. cur-start > timeout){
      16. break;
      17. }
      18. }
      19. } finally {
      20. lock.unlock();
      21. }
      22. if (!isDone()) {
      23. throw new TimeoutException();
      24. }
      25. return returnFromResponse();
      26. }
      27. // RPC 结果是否已经返回
      28. boolean isDone() {
      29. return response != null;
      30. }
      31. // RPC 结果返回时调用该方法
      32. private void doReceived(Response res) {
      33. lock.lock();
      34. try {
      35. response = res;
      36. if (done != null) {
      37. done.signal();
      38. }
      39. } finally {
      40. lock.unlock();
      41. }
      42. }
    • 调用线程通过调用 get() 方法等待 RPC返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用unlock()释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal()来通知调用线程,结果已经返回,不用继续等待了。

    • 至此,Dubbo 里面的异步转同步的源码就分析完了,有没有觉得还挺简单的?最近这几年,工作中需要异步处理的越来越多了,其中有一个主要原因就是有些 API 本身就是异步 API。例如websocket 也是一个异步的通信协议,如果基于这个协议实现一个简单的RPC,你也会遇到异步转同步的问题。现在很多公有云的 API本身也是异步的,例如创建云主机,就是一个异步的API,调用虽然成功了,但是云主机并没有创建成功,你需要调用另外一个API去轮询云主机的状态。如果你需要在项目内部封装创建云主机的API,你也会面临异步转同步的问题,因为同步的 API 更易用。
  8. 课后思考:efaultFuture 里面唤醒等待的线程,用的是 signal(),而不是 signalAll(),你来分析一下,这样做是否合理呢?

    • Dubbo最近已经把signal()改成signalAll()了,我觉得用signal()也不能说错,但的确是用signalAll()会更安全。我个人也倾向于使用signalAll(),因为我们写程序,不是做数学题,而是在搞工程,工程中会有很多
      不稳定的因素,更有很多你预料不到的情况发生,所以不要让你的代码铤而走险,尽量使用更稳妥的方案和设计。Dubbo修改后的相关代码如下所示:
      1. private void doReceived(Response res) {
      2. lock.lock();
      3. try {
      4. response = res;
      5. done.signalAll();
      6. } finally {
      7. lock.unlock();
      8. }
      9. }

2-Semaphore限流器

  1. Semaphore简介?

    • 概念:
      Semaphore,现在普遍翻译为“信号量”,以前也曾被翻译成“信号灯”,因为类似现实生活里的红绿灯,车辆能不能通行,要看是不是绿灯。同样,在编程世界里,线程能不能执行,也要看信号量是不是允许。
    • 发展:
      信号量是由大名鼎鼎的计算机科学家迪杰斯特拉(Dijkstra)于1965年提出,在这之后的15年,信号量一直都是并发编程领域的终结者,直到1980年管程被提出来,才有了第二选择。目前几乎所有支持并发编程的语言都支持信号量机制。
    • Semaphore的功能
      -- 互斥锁功能。
      -- Semaphore 可以允许多个线程访问一个临界区。
  2. 信号量模型?

    • 简单概括为:一个计数器,一个等待队列,三个方法
    • 信号量模型中计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和up()。
      在这里插入图片描述
    • init():设置计数器的初始值。
      down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
      up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
  3. 信号量模型的代码化?

    • init()、down() 和 up()三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。
    • Java SDK里面,信号量模型是由java.util.concurrent.Semaphore实现的,Semaphore这个类能够保证这三个方法都是原子操作。
    • 如下,代码化的信号量模型。
  1. class Semaphore{
  2. // 计数器
  3. int count;
  4. // 等待队列
  5. Queue queue;
  6. // 初始化操作
  7. Semaphore(int c){
  8. this.count=c;
  9. }
  10. //
  11. void down(){
  12. this.count--;
  13. if(this.count<0){
  14. // 将当前线程插入等待队列
  15. // 阻塞当前线程
  16. }
  17. }
  18. void up(){
  19. this.count++;
  20. if(this.count<=0) {
  21. // 移除等待队列中的某个线程 T
  22. // 唤醒线程 T
  23. }
  24. }
  25. }
  1. PV原语?

    • 信号量模型里面,down()、up()这两个操作历史上最早称为 P 操作和 V操作,所以信号量模型也被称为PV原语。
    • 另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up()对应的则是acquire() 和 release()。
  2. 如何使用信号量?

    • 互斥锁
    • 限流器
  3. 累加器中的互斥?

    • 红绿灯控制交通的一个关键规则:车辆在通过路口前必须先检查是否是绿灯,只有绿灯才能通行。
    • 累加器的例子--说明信号量的使用。
    • 信号量实现了一个最简单的互斥锁功能。
    • 在累加器的例子里面,count+=1操作是个临界区,只允许一个线程执行,也就是说要保证互斥。那这种情况用信号量怎么控制呢?
    • 其实很简单,就像用互斥锁一样,只需要在进入临界区之前执行一下down()操作,退出临界区之前执行一下 up() 操作就可以了。
    • 如下,acquire() 就是信号量里的down()操作,release() 就是信号量里的 up() 操作。

      1. static int count;
      2. // 初始化信号量
      3. static final Semaphore s = new Semaphore(1);
      4. // 用信号量保证互斥
      5. static void addOne() {
      6. s.acquire();
      7. try {
      8. count+=1;
      9. } finally {
      10. s.release();
      11. }
      12. }
    • 信号量的计数器,设置成了1,这个1表示只允许一个线程进入临界区。

  4. 分析信号量是如何保证互斥的?

    • 假设两个线程T1、T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为 -1。
    • 对于线程 T1,信号量里面的计数器的值是0,大于等于 0,所以线程 T1 会继续执行;对于线程T2,信
      号量里面的计数器的值是-1,小于0,按照信号量模型里对 down()操作的描述,线程T2将被阻塞。所以此时只有线程 T1会进入临界区执行count+=1;。
    • 当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后
      的值是 0,小于等于 0,按照信号量模型里对 up() 操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。
  5. 既然Java SDK提供了Lock,为啥还要提供一个Semaphore?

    • 信号量实现互斥锁功能,仅仅是Semaphore的部分功能;
    • Semaphore 还有一个功能是 Lock不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。
  6. 限流器的应用?

    • 比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。
    • 其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。
  7. 快速实现一个限流器?

    • 对象池需求:
      -- 对象池,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。
      -- 对象池,可以用List保存实例对象,很简单。
      -- 关键是限流器的设计,这里的限流,指的是不允许多于 N 个线程同时进入临界区。
    • 如何快速实现一个这样的限流器呢?
    • 计数器的值设置成对象池里对象的个数N,就能完美解决对象池的限流问题了。
    • 对象池的示例代码:

      1. class ObjPool<T, R> {
      2. final List<T> pool;
      3. // 用信号量实现限流器
      4. final Semaphore sem;
      5. // 构造函数
      6. ObjPool(int size, T t){
      7. pool = new Vector<T>(){};
      8. for(int i=0; i<size; i++){
      9. pool.add(t);
      10. }
      11. sem = new Semaphore(size);
      12. }
      13. // 利用对象池的对象,调用 func
      14. R exec(Function<T,R> func) {
      15. T t = null;
      16. sem.acquire();
      17. try {
      18. t = pool.remove(0);
      19. return func.apply(t);
      20. } finally {
      21. pool.add(t);
      22. sem.release();
      23. }
      24. }
      25. }
      26. // 创建对象池
      27. ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
      28. // 通过对象池获取 t,之后执行
      29. pool.exec(t -> {
      30. System.out.println(t);
      31. return t.toString();
      32. });
    • 用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的
      exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释
      放工作是通过 pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

  8. 思考题:在上面对象池的例子中,对象保存在了Vector中,Vector是Java提供的线程安全的容器,如果我们把Vector换成ArrayList,是否可以呢?

    • Semaphore可以允许多个线程访问一个临界区,那就意味着可能存在多个线程同时访问ArrayList,而ArrayList不是线程安全的,所以对象池的例子中是不能够将Vector换成ArrayList的。
    • Semaphore允许多个线程访问一个临界区,这也是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,也就是说Semaphore需要锁中锁。

3-ReadWriteLock读写锁

  1. 总结

    • 读写锁允许多个线程同时读共享变量,适用于读多写少的场景
  2. Java SDK并发包里为什么还有很多其他的工具类呢?

    • 现象:
      管程和信号量这两个同步原语理论上用任何一个都可以解决所有的并发问题。
    • 其他工具类存在的原因:
      分场景优化性能,提升易用性。
  3. 并发场景:读多写少场景 的分析?

    • 典型的读多写少应用场景:为了优化性能,会使用缓存,例如缓存元数据、缓存基础数据等。
    • 缓存提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。
    • 针对读多写少这种并发场景,Java SDK并发包提供了读写锁——ReadWriteLock,易使用,且性能很好。
  4. 那什么是读写锁呢?

    • 读写锁,非Java语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:
      1)允许多个线程同时读共享变量;
      2)只允许一个线程写共享变量;
      3)如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
    • 读写锁类似于ReentrantLock,也支持公平模式和非公平模式。
    • 读锁和写锁都实现了java.util.concurrent.locks.Lock 接口,所以除了支持lock()方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。
    • ReadWriteLock是一个接口,它的实现类是ReentrantReadWriteLock,通过名字判断出来,它是支持可重入的。
    • 但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用newCondition()会抛出UnsupportedOperationException异常。
  5. 读写锁与互斥锁的重要区别?

    • 一个重要区别:读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。
  6. 如何快速实现一个缓存?
    用 ReadWriteLock快速实现一个通用的缓存工具类:

    • 代码中,声明了一个Cache类,其中类型参数 K 代表缓存里 key 的类型,V代表缓存里 value的类型。
    • 缓存的数据保存在 Cache 类内部的 HashMap 里面,HashMap不是线程安全的,这里使用读写锁 ReadWriteLock 来保证其线程安全。
    • ReadWriteLock 是一个接口,它的实现类是 ReentrantReadWriteLock,通过名字判断出来,它是支持可重入的。
    • 通过 rwl创建了一把读锁和一把写锁。
    • Cache 这个工具类,我们提供了两个方法,一个是读缓存方法 get(),另一个是写缓存方法put()。读缓存需要用到读锁,读锁的使用和前面我们介绍的Lock 的使用是相同的,都是try{}finally{}这个编程范式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。
      1. class Cache<K,V> {
      2. final Map<K, V> m = new HashMap<>();
      3. final ReadWriteLock rwl = new ReentrantReadWriteLock();
      4. // 读锁
      5. final Lock r = rwl.readLock();
      6. // 写锁
      7. final Lock w = rwl.writeLock();
      8. // 读缓存
      9. V get(K key) {
      10. r.lock();
      11. try { return m.get(key); }
      12. finally { r.unlock(); }
      13. }
      14. // 写缓存
      15. V put(String key, Data v) {
      16. w.lock();
      17. try { return m.put(key, v); }
      18. finally { w.unlock(); }
      19. }
      20. }
  7. 处理缓存数据的初始化问题?

    • 缓存首先要解决缓存数据的初始化问题。
    • 缓存数据的初始化,可以采用一次性加载的方式,也可以使用按需加载的方式。
    • 小数据量:
      如果源头数据的数据量不大,就可以采用一次性加载的方式,这种方式最简单(可参考下图),只需在应用启动的时候把源头数据查询出来,依次调用类似上面示例代码中的 put() 方法就可以了。
      在这里插入图片描述
    • 大数据量:
      -- 源头数据量非常大,就需要按需加载了;
      -- 按需加载也叫懒加载,指的是只有当应用查询缓存,并且数据不在缓存里的时候,才触发加载源头相关数据进缓存的操作。
      -- 下面你可以结合文中示意图看看如何利用ReadWriteLock 来实现缓存的按需加载。
      在这里插入图片描述
  8. 实现缓存的按需加载?

    • 假设缓存的源头是数据库。
    • 注意:
      -- 如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,所以在代码中的⑤处,调用了 w.lock()来获取写锁。
      -- 在获取写锁之后,并没有直接去查询数据库,而是在代码⑥⑦处,重新验证了一次缓存中是否存在,再次验证如果还是不存在,我们才去查询数据库并更新本地缓存。
      1. //按需加载的功能
      2. class Cache<K,V> {
      3. final Map<K, V> m = new HashMap<>();
      4. final ReadWriteLock rwl = new ReentrantReadWriteLock();
      5. final Lock r = rwl.readLock();
      6. final Lock w = rwl.writeLock();
      7. V get(K key) {
      8. V v = null;
      9. // 读缓存
      10. r.lock(); //①
      11. try {
      12. v = m.get(key); //②
      13. } finally{
      14. r.unlock(); //③
      15. }
      16. // 缓存中存在,返回
      17. if(v != null) { //④
      18. return v;
      19. }
      20. // 缓存中不存在,查询数据库
      21. w.lock(); //⑤
      22. try {
      23. // 再次验证
      24. // 其他线程可能已经查询过数据库
      25. v = m.get(key); //⑥
      26. if(v == null){ //⑦
      27. // 查询数据库
      28. v= 省略代码⽆数
      29. m.put(key, v);
      30. }
      31. } finally{
      32. w.unlock();
      33. }
      34. return v;
      35. }
      36. }
  9. 为什么我们要再次验证呢?

    • 高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程 T1、T2 和 T3 同时调用 get()方法,并且参数 key 也是相同的。那么它们会同时执行到代码⑤处,但此时只有一个线程能够获得写锁,假设是线程 T1,线程 T1 获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程T2和T3会再有一个线程能够获取写锁,假设是T2,如果不采用再次验证的方式,此时 T2 会再次查询数据库。T2 释放写锁之后,T3 也会再次查询一次数据库。而实际上线程 T1 已经把缓存的值设置好了,T2、T3完全没有必要再次查询数据库。
    • 所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题。
  10. 上述缓存方案的问题?

    • 问题:
      用ReadWriteLock实现了一个简单的缓存,这个缓存虽然解决了缓存的初始化问题,但是没有解决缓存数据与源头数据的同步问题,这里的数据同步指的是保证缓存数据和源头数据的一致性。
    • 解决:
      -- 解决数据同步问题的一个最简单的方案就是超时机制。
      -- 超时机制:指的是加载进缓存的数据不是长久有效的,而是有时效的,当缓存的数据超过时效,也就是超时之后,这条数据在缓存中就失效了。
      -- 访问缓存中失效的数据,会触发缓存重新从源头把数据加载进缓存。当然也可以在源头数据发生变化时,快速反馈给缓存,具体采用哪种方案,还是要看应用的场景。
      例如MySQL 作为数据源头,可以通过近实时地解析 binlog 来识别数据是否发生了变化,如果发生了
      变化就将最新的数据推送给缓存。另外,还有一些方案采取的是数据库和缓存的双写方案。
  11. 读写锁的升级与降级?
    上面按需加载的示例代码中,在①处获取读锁,在③处释放读锁,那是否可以在②处的下面增加验证缓存并更新缓存的逻辑呢?

    1. // 读缓存
    2. r.lock();
    3. try {
    4. v = m.get(key);
    5. if (v == null) {
    6. w.lock();
    7. try {
    8. // 再次验证并更新缓存
    9. // 省略详细代码
    10. } finally{
    11. w.unlock();
    12. }
    13. }
    14. } finally{
    15. r.unlock();
    16. }
    • 这样看上去好像是没有问题的,先是获取读锁,然后再升级为写锁,对此还有个专业的名字,叫锁的升级。
    • ReadWriteLock并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。
    • 锁的升级是不允许的,但是锁的降级却是允许的。
  12. 锁降级?

    • 以下代码来源自ReentrantReadWriteLock 的官方示例,略做了改动。
    • 在代码①处,获取读锁的时候线程还是持有写锁的,这种锁的降级是支持的。
      1. class CachedData {
      2. Object data;
      3. volatile boolean cacheValid;
      4. final ReadWriteLock rwl = new ReentrantReadWriteLock();
      5. // 读锁
      6. final Lock r = rwl.readLock();
      7. // 写锁
      8. final Lock w = rwl.writeLock();
      9. void processCachedData() {
      10. // 获取读锁
      11. r.lock();
      12. if (!cacheValid) {
      13. // 释放读锁,因为不允许读锁的升级
      14. r.unlock();
      15. // 获取写锁
      16. w.lock();
      17. try {
      18. // 再次检查状态
      19. if (!cacheValid) {
      20. data = ...
      21. cacheValid = true;
      22. }
      23. // 释放写锁前,降级为读锁
      24. // 降级是可以的
      25. r.lock(); // ①
      26. } finally {
      27. // 释放写锁
      28. w.unlock();
      29. }
      30. }
      31. // 此处仍然持有读锁
      32. try {use(data);}
      33. finally {r.unlock();}
      34. }
      35. }
  13. 课后思考
    有同学反映线上系统停止响应了,CPU 利用率很低,你怀疑有同学一不小心写出了读锁升级写锁的方案,那你该如何验证自己的怀疑呢?

    • 本质上都是定位线上并发问题,方案很简单,就是通过查看线程栈来定位问题。
    • 重点是查看线程状态,分析线程进入该状态的原因是否合理。
    • 为了便于分析定位线程问题,你需要给线程赋予一个有意义的名字,对于线程池可以通过自定义ThreadFactory来给线程池中的线程赋予有意义的名字,也可以在执行run()方法时通过Thread.currentThread().setName();来给线程赋予一个更贴近业务的名字。

4-StampedLock

  1. 总结

    • Java1.8提供了一种叫StampedLock的锁,它的性能就比读写锁还要好。
  2. StampedLock 和ReadWriteLock 有哪些区别?

    • ReadWriteLock支持两种模式:一种是读锁,
    • StampedLock支持三种模式,分别是:写锁、悲观读锁和乐观读。
      -- 同:写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的
      语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。
      -- 异:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个
      stamp;然后解锁的时候,需要传入这个 stamp。
    • 相关的示例代码如下。
      1. final StampedLock sl =new StampedLock();
      2. // 获取 / 释放悲观读锁⽰意代码
      3. long stamp = sl.readLock();
      4. try {
      5. // 省略业务相关代码
      6. } finally {
      7. sl.unlockRead(stamp);
      8. }
      9. // 获取 / 释放写锁⽰意代码
      10. long stamp = sl.writeLock();
      11. try {
      12. // 省略业务相关代码
      13. } finally {
      14. sl.unlockWrite(stamp);
      15. }
  3. StampedLock 的性能优于ReadWriteLock的原因?

    • 关键是 StampedLock 支持乐观读的方式。
    • ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而StampedLock提供的乐观读,多个线程同时读的时候,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
    • 注意:“乐观读”不是“乐观读锁”,乐观读这个操作是无锁的,所以相比较ReadWriteLock 的读锁,乐观读的性能更好一些。
  4. 乐观读悲观读的使用示例?

    • 代码出自Java SDK官方示例,略做了修改。
    • distanceFromOrigin()方法中,首先通过调用tryOptimisticRead()获取了一个 stamp,这里的tryOptimisticRead()就是乐观读。之后将共享变量x和y读入方法的局部变量中,不过需要注意的是,由于tryOptimisticRead()是无锁的,所以共享变量x和y读入方法局部变量时,x和y有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,验证操作通过调用validate(stamp)来实现的。

      1. class Point {
      2. private int x, y;
      3. final StampedLock sl = new StampedLock();
      4. // 计算到原点的距离
      5. int distanceFromOrigin() {
      6. // 乐观读
      7. long stamp = sl.tryOptimisticRead();
      8. // 读⼊局部变量,读的过程数据可能被修改
      9. int curX = x, curY = y;
      10. // 判断执⾏读操作期间,是否存在写操作,如果存在,则sl.validate返回false
      11. if (!sl.validate(stamp)){
      12. // 升级为悲观读锁
      13. stamp = sl.readLock();
      14. try {
      15. curX = x;
      16. curY = y;
      17. } finally {
      18. // 释放悲观读锁
      19. sl.unlockRead(stamp);
      20. }
      21. }
      22. return Math.sqrt(curX * curX + curY * curY);
      23. }
      24. }
    • 代码中,如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁

    • 合理操作,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期
      间没有写操作(只有这样才能保证x和y的正确性和一致性),而循环读会浪费大量的CPU。升级为悲观读锁,代码简练且不易出错,建议你在具体实践时也采用这样的方法。
  5. 进一步理解乐观读--介绍一下数据库里的乐观锁?

    • 数据库乐观锁的场景例子:
      在 ERP 的生产模块里,会有多个人通过ERP系统提供的UI同时修改同一条生产订单,那如何保证生产订单数据是并发安全的呢?我采用的方案就是乐观锁。
    • 实现:
      在生产订单的表product_doc 里增加了一个数值型版本号字段 version,每次更新product_doc这个表的时候,都将version字段加1。生产订单的UI在展示的时候,需要查询数据库,此时将这个version字段和其他业务字段一起返回给生产订单UI。
    • 例子
      -- 假设用户查询的生产订单的id=777,那么 SQL 语句类似下面这样:
      1. select id,... version
      2. from product_doc
      3. where id=777

    -- 用户在生产订单UI执行保存操作的时候,后台利用下面的SQL语句更新生产订单,此处我们假设该条生产订单的version=9。

    1. update product_doc
    2. set version=version+1,...
    3. where id=777 and version=9

    -- 如果这条 SQL 语句执行成功并且返回的条数等于1,那么说明从生产订单 UI 执行查询操作到执行保存操作期间,没有其他人修改过这条数据。因为如果这期间其他人修改过这条数据,那么版本号字段一定会大于 9。

  6. 进一步理解乐观读--StampedLock的乐观读?

    • StampedLock的乐观读和数据库的乐观锁有异曲同工之妙。
    • 数据库里的乐观锁,查询的时候需要把version字段查出来,更新的时候要利用version字段做验证。这个 version 字段就类似于 StampedLock 里面的 stamp。
  7. StampedLock 使用注意事项?

    • 对于读多写少的场景StampedLock性能很好,简单的应用场景基本上可以替代 ReadWriteLock;
    • 但是StampedLock的功能仅仅是ReadWriteLock的子集,在使用的时候,需要注意几个问题:
      ① StampedLock 命名上没有增加Reentrant,事实上,StampedLock不支持重入。
      ② StampedLock的悲观读锁、写锁都不支持条件变量;
      ③ 如果线程阻塞在 StampedLock的readLock()或者writeLock()上时,此时调用该阻塞线程的interrupt()方法,会导致CPU飙升。
      -- 所以,使用 StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()
      -- 如下,线程T1获取写锁之后将自己阻塞,线程T2尝试获取悲观读锁,也会阻塞;如果此时调用线程T2的interrupt() 方法来中断线程 T2 的话,线程 T2 所在 CPU会飙升到100%。
      1. final StampedLock lock = new StampedLock();
      2. Thread T1 = new Thread(() -> {
      3. // 获取写锁
      4. lock.writeLock();
      5. // 永远阻塞在此处,不释放写锁
      6. LockSupport.park();
      7. });
      8. T1.start();
      9. // 保证 T1 获取写锁
      10. Thread.sleep(100);
      11. Thread T2 = new Thread(() ->
      12. // 阻塞在悲观读锁
      13. lock.readLock()
      14. );
      15. T2.start();
      16. // 保证 T2 阻塞在读锁
      17. Thread.sleep(100);
      18. // 中断线程 T2
      19. // 会导致线程 T2 所在 CPU 飙升
      20. T2.interrupt();
      21. T2.join();
  8. 工程使用?

    • 把 Java 官方示例精简后,形成下面的代码模板,建议你在实际工作中尽量按照这个模板来使用StampedLock。
    • StampedLock读模板:

      1. final StampedLock sl = new StampedLock();
      2. // 乐观读
      3. long stamp = sl.tryOptimisticRead();
      4. // 读⼊⽅法局部变量
      5. ......
      6. // 校验 stamp
      7. if (!sl.validate(stamp)){
      8. // 升级为悲观读锁
      9. stamp = sl.readLock();
      10. try {
      11. // 读⼊⽅法局部变量
      12. .....
      13. } finally {
      14. // 释放悲观读锁
      15. sl.unlockRead(stamp);
      16. }
      17. }
      18. // 使⽤⽅法局部变量执⾏业务操作
      19. ......
    • StampedLock 写模板:

      1. long stamp = sl.writeLock();
      2. try {
      3. // 写共享变量
      4. ......
      5. } finally {
      6. sl.unlockWrite(stamp);
      7. }
  9. 锁升级降级?

    • StampedLock 支持锁的降级(通过 tryConvertToReadLock() 方法实现)和升级(通过tryConvertToWriteLock()方法实现),但是建议你要慎重使用。下面的代码也源自Java的官方示例,仅仅做了一点修改,但隐藏了一个 Bug。

      1. private double x, y;
      2. final StampedLock sl = new StampedLock();
      3. // 存在问题的⽅法
      4. void moveIfAtOrigin(double newX, double newY){
      5. long stamp = sl.readLock();
      6. try {
      7. while(x == 0.0 && y == 0.0){
      8. long ws = sl.tryConvertToWriteLock(stamp);
      9. if (ws != 0L) {
      10. x = newX;
      11. y = newY;
      12. break;
      13. } else {
      14. sl.unlockRead(stamp);
      15. stamp = sl.writeLock();
      16. }
      17. }
      18. } finally {
      19. sl.unlock(stamp);
      20. }
      21. }
    • Bug出在没有正确地释放锁。

    • 锁的申请和释放要成对出现,对此我们有一个最佳实践,就是使用try{}finally{},但是try{}finally{}并不能解决所有锁的释放问题。比如示例代码中,锁的升级会生成新的stamp,而finally中释放锁用的是锁升级前的stamp,本质上这也属于锁的申请和释放没有成对出现,只是它隐藏得有点深。解决这个问题倒也很简单,只需要对stamp重新赋值就可以了,修复后的代码如下所示:
      1. private double x, y;
      2. final StampedLock sl = new StampedLock();
      3. // 存在问题的⽅法
      4. void moveIfAtOrigin(double newX, double newY){
      5. long stamp = sl.readLock();
      6. try {
      7. while(x == 0.0 && y == 0.0){
      8. long ws = sl.tryConvertToWriteLock(stamp);
      9. if (ws != 0L) {
      10. stamp = ws; //① 问题在于没对stamp重新赋值,增加赋值
      11. x = newX;
      12. y = newY;
      13. break;
      14. } else {
      15. sl.unlockRead(stamp);
      16. stamp = sl.writeLock();
      17. }
      18. }
      19. } finally {
      20. sl.unlock(stamp);
      21. s1.unlock(stamp);//②此处unlock的是stamp
      22. }
      23. }

5-CountDownLatch|CyclicBarrier

  1. 总结
    • CountDownLatch 和 CyclicBarrier 是Java并发包提供的两个非常易用的线程同步工具类
    • 用法的区别:
      CountDownLatch主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;CyclicBarrier是一组线程之间互相等待,更像是几个驴友之间不离不弃。
      CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用await(),该线程会直接通过。但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。
      ③ CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

1-CountDownLatch-等待

  1. 对账系统问题?

    • 问题:越来越慢
    • 系统业务:
      用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。流程如下:
    • 目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
      在这里插入图片描述
    • 对账系统的代码抽象,如下,在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
      1. while(存在未对账订单){
      2. // 查询未对账订单
      3. pos = getPOrders();
      4. // 查询派送单
      5. dos = getDOrders();
      6. // 执⾏对账操作
      7. diff = check(pos, dos);
      8. // 差异写⼊差异库
      9. save(diff);
      10. }
  2. 优化一:利用并行优化对账系统?

    • 问题:
      目前的对账系统,由于订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和查询派送单getDOrders()相对较慢,那有没有办法快速优化一下呢?
    • 目前对账系统是单线程执行的,对于串行化的系统,优化性能首先想到的是能否利用多线程并行处理。
      在这里插入图片描述
    • 方案:
      -- 对账系统里的瓶颈:查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?
      这两个操作并没有先后顺序的依赖是可以的,将两个最耗时的操作并行。
      对比一下单线程的执行示意图,同等时间里,并行执行的吞吐量近乎单线程的 2 倍,优化效果还是相对明显的。
      在这里插入图片描述
    • 代码实现:
      创建了两个线程 T1 和T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入save()两个操作。不过需要注意的是:主线程需要等待线程T1 和 T2 执行完才能执行check()和save()这两个操作,为此我们通过调用 T1.join()和T2.join()来实现等待,当T1和T2线程退出时,调用 T1.join() 和 T2.join()的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save()。
      1. while(存在未对账订单){
      2. // 查询未对账订单
      3. Thread T1 = new Thread(()->{
      4. pos = getPOrders();
      5. });
      6. T1.start();
      7. // 查询派送单
      8. Thread T2 = new Thread(()->{
      9. dos = getDOrders();
      10. });
      11. T2.start();
      12. // 等待 T1、T2 结束
      13. T1.join();
      14. T2.join();
      15. // 执⾏对账操作
      16. diff = check(pos, dos);
      17. // 差异写⼊差异库
      18. save(diff);
      19. }
  3. 优化二:用 CountDownLatch实现线程等待?

    • 问题一:while循环里面每次都会创建新的线程,而创建线程可是个耗时的操作。
      -- 使创建出来的线程能够循环利用?---线程池
      -- 如下,首先创建了一个固定大小为2的线程池,之后在while循环里重复利用。
    • 问题二:主线程如何知道 getPOrders() 和 getDOrders()这两个操作什么时候执行完?
      -- 前一个方案主线程通过调用线程T1和T2的join()方法来等待线程T1和T2退出,但在线程池的方案里,线程根本就不会退出,所以join()方法已经失效了
      1. // 创建 2 个线程的线程池
      2. Executor executor = Executors.newFixedThreadPool(2);
      3. while(存在未对账订单){
      4. // 查询未对账订单
      5. executor.execute(()-> {
      6. pos = getPOrders();
      7. });
      8. // 查询派送单
      9. executor.execute(()-> {
      10. dos = getDOrders();
      11. });
      12. /* ??如何实现等待??*/
      13. // 执⾏对账操作
      14. diff = check(pos, dos);
      15. // 差异写⼊差异库
      16. save(diff);
      17. }
  4. 优化二:解决通知的问题?

    • 思路:最直接的办法是弄一个计数器,初始值设置成 2,当执行完pos = getPOrders();这个操作之后将计数器减1,执行完dos=getDOrders();之后也将计数器减 1,在主线程里,等待计数器等于 0;当计数器等于 0时,说明这两个查询操作执行完了。等待计数器等于0其实就是一个条件变量,用管程实现起来也很简单。
    • Java 并发包提供了类似功能的工具类:CountDownLatch。
    • 如下,在while循环里面,我们首先创建了一个CountDownLatch,计数器的初始值等于 2,之后在pos =getPOrders();和dos = getDOrders(); 两条语句的后面对计数器执行减1操作,这个对计数器减1的操作是通过调用latch.countDown();来实现的。在主线程中,我们通过调用latch.await()来实现对计数器等于0的等待。
  1. // 创建 2 个线程的线程池
  2. Executor executor = Executors.newFixedThreadPool(2);
  3. while(存在未对账订单){
  4. // 计数器初始化为 2
  5. CountDownLatch latch = new CountDownLatch(2);
  6. // 查询未对账订单
  7. executor.execute(()-> {
  8. pos = getPOrders();
  9. latch.countDown();
  10. });
  11. // 查询派送单
  12. executor.execute(()-> {
  13. dos = getDOrders();
  14. latch.countDown();
  15. });
  16. // 等待两个查询操作结束
  17. latch.await();
  18. // 执⾏对账操作
  19. diff = check(pos, dos);
  20. // 差异写⼊差异库
  21. save(diff);
  22. }
  1. 优化三:生产者-消费者+队列?

    • 优化点:getPOrders()、getDOrders()这两个查询并行了,但他们和check()、save()之间还是串行的。很显然,两个查询和对账操作也是可以并行的,即,在执行对账操作的时候,可以同时去执行下一轮的查询操作。
      在这里插入图片描述
    • 分析:
      -- 两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果--- 生产者-消费者:两次查询操作是生产者,对账操作是消费者。
      -- 使用队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。
    • 针对对账项目,设计了两个队列,并且两个队列的元素之间还有对应关系。订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
      如图:
      在这里插入图片描述
  2. 优化三:如何用双队列来实现完全的并行?

    • 最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1条数据的时候,通知线程 T3执行对账操作。
    • 上述思路隐藏条件:那就是线程 T1和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。
    • 如图,线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致;同时当线程 T1 和T2都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。
      在这里插入图片描述

2-CyclicBarrier-同步

  1. 优化三:实现两个查询操作和检查操作的并发同步?
    • 2个难点:
      ① 线程 T1 和 T2 要做到步调一致 ② 要能够通知到线程 T3。
    • 思路:
      依然利用一个计数器来解决这两个难点,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程T3,并唤醒等待的线程 T1或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程T2生产下一条数据的时候就可以继续使用这个计数器了。
    • 思路实现:Java 并发包的工具类:CyclicBarrier。
    • 如下代码:
      首先创建了一个计数器初始值为 2 的 CyclicBarrier,
      注意创建 CyclicBarrier的时候,还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。
      线程 T1 负责查询订单,当查出一条时,调用barrier.await()来将计数器减 1,同时等待计数器变成 0;线程T2负责查询派送单,当查出一条时,也调用barrier.await()来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2都调用barrier.await() 的时候,计数器会减到 0,此时 T1 和T2就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。
    • 非常值得一提的是,CyclicBarrier的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。
  1. // 订单队列
  2. Vector<P> pos;
  3. // 派送单队列
  4. Vector<D> dos;
  5. // 执⾏回调的线程池
  6. Executor executor = Executors.newFixedThreadPool(1);
  7. final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
  8. executor.execute(()->check());
  9. });
  10. void check(){
  11. P p = pos.remove(0);
  12. D d = dos.remove(0);
  13. // 执⾏对账操作
  14. diff = check(p, d);
  15. // 差异写⼊差异库
  16. save(diff);
  17. }
  18. void checkAll(){
  19. // 循环查询订单库
  20. Thread T1 = new Thread(()->{
  21. while(存在未对账订单){
  22. // 查询订单库
  23. pos.add(getPOrders());
  24. // 等待
  25. barrier.await();
  26. }
  27. }
  28. T1.start();
  29. // 循环查询运单库
  30. Thread T2 = new Thread(()->{
  31. while(存在未对账订单){
  32. // 查询运单库
  33. dos.add(getDOrders());
  34. // 等待
  35. barrier.await();
  36. }
  37. }
  38. T2.start();
  39. }
  1. 上述示例代码中,CyclicBarrier的回调函数我们使用了一个固定大小的线程池,你觉得是否有必要呢?

6-并发容器

  1. 同步容器及其注意事项

    • Java中的容器主要可以分为四个大类,分别是 List、Map、Set和Queue,但并不是所有的Java容器都是线程安全的。
  2. 如何将非线程安全的容器变成线程安全的容器?

    • 思路:把非线程安全的容器封装在对象内部,然后控制好访问路径。
  3. ArrayList变成线程安全的?

    • SafeArrayList内部持有一个ArrayList的实例c,所有访问c的方法我们都增加了synchronized关键字,需要注意的是我们还增加了一个addIfNotExist() 方法,这个方法也是用synchronized来保证原子性的。
  1. SafeArrayList<T>{
  2. // 封装ArrayList
  3. List<T> c=new ArrayList<>();
  4. // 控制访问路径
  5. synchronized T get(int idx){
  6. return c.get(idx);
  7. }
  8. synchronized void add(int idx, T t) {
  9. c.add(idx, t);
  10. }
  11. synchronized boolean addIfNotExist(T t){
  12. if(!c.contains(t)) {
  13. c.add(t);
  14. return true;
  15. }
  16. return false;
  17. }
  18. }
  1. List list = Collections.synchronizedList(new ArrayList());
  2. Set set = Collections.synchronizedSet(new HashSet());
  3. Map map = Collections.synchronizedMap(new HashMap());
  1. 遍历的原子性问题?
    • 注意组合操作需要注意竞态条件问题,例如上面提到的 addIfNotExist()方法就包含组合操作。组合操作往往隐藏着竞态条件问题,即便每个操作都能保证原子性,也并不能保证组合操作的原子性,这个一定要注意。
    • 在容器领域一个容易被忽视的“坑”是用迭代器遍历容器,例如在下面的代码中,通过迭代器遍历容器list,对每个元素调用foo()方法,这就存在并发问题,这些组合的操作不具备原子性。
  1. List list = Collections.synchronizedList(new ArrayList());
  2. Iterator i = list.iterator();
  3. while (i.hasNext())
  4. foo(i.next());
  1. Listlist = Collections.synchronizedList(new ArrayList());
  2. synchronized(list){
  3. Iterator i = list.iterator();
  4. while(i.hasNext())
  5. foo(i.next());
  6. }

(二)Map
。。。。

7-原子类-无锁

  1. 无锁方案概述
    • 无锁方案相对于互斥锁方案,优点非常多,首先性能好,其次是基本不会出现死锁问题(但可能出现饥饿和活锁问题,因为自旋会反复重试)
    • Java提供的原子类大部分都实现了compareAndSet() 方法,基于compareAndSet() 方法,你可以构建自己的无锁数据结构,但是建议你不要这样做,这个工作最好还是让大师们去完成,原因是无锁算法没你想象的那么简单。
    • Java提供的原子类能够解决一些简单的原子性问题,但你可能会发现,上面我们所有原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方案。原子类虽好,但使用要慎之又慎。
概览
  1. JUC提供的原子类?

    • 可以分为五个类别:原子化的基本数据类型、原子化的对象引用类型、原子化数组、原子化对象属性更新器和原子化的累加器。
    • 五个类别提供的方法基本上是相似的,并且每个类别都有若干原子类。
      在这里插入图片描述
  2. 类别一:原子化的基本数据类型?

    • 相关实现有AtomicBoolean、AtomicInteger和AtomicLong;
    • 提供的方法主要如下,详情可参考SDK的源代码。
      1. getAndIncrement() // 原子化 i++
      2. getAndDecrement() // 原子化 i--
      3. incrementAndGet() // 原子化 ++i
      4. decrementAndGet() // 原子化 --i
      5. //当前值+=delta,返回+=前的值
      6. getAndAdd(delta)
      7. //当前值+=delta,返回+=后的值
      8. addAndGet(delta)
      9. //CAS操作,返回是否成功
      10. compareAndSet(expect,update)
      11. // 以下四个方法
      12. // 新值可以通过传入func函数来计算
      13. getAndUpdate(func)
      14. updateAndGet(func)
      15. getAndAccumulate(x,func)
      16. accumulateAndGet(x,func)
  3. 类别二: 原子化的对象引用类型?

    • 相关实现有AtomicReference、AtomicStampedReference和AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。
    • AtomicReference提供的方法和原子化的基本数据类型差不多。注意对象引用的更新需要重点关注ABA问题,AtomicStampedReference和AtomicMarkableReference这两个原子类可以解决ABA问题
      -- 解决ABA问题的思路其实很简单,增加一个版本号维度就可以了,这个和乐观锁机制很类似,每次执行CAS 操作,附加再更新一个版本号,只要保证版本号是递增的,那么即便 A 变成 B 之后再变回A,版本号也不会变回来(版本号递增的)。
    • AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:

      1. boolean compareAndSet(V expectedReference,
      2. V newReference,
      3. int expectedStamp,
      4. int newStamp)
    • AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法签名如下

      1. boolean compareAndSet(V expectedReference,
      2. V newReference,
      3. boolean expectedMark,
      4. boolean newMark)
  4. 分类三:原子化数组?

    • 相关实现有 AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray,利用这些原子类,可以原子化地更新数组里面的每一个元素。
    • 这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数,不再赘述。
  5. 分类四:原子化对象属性更新器?

    • 相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:

      1. public static <U> AtomicXXXFieldUpdater<U> newUpdater(Class<U> tclass,
      2. String fieldName)
    • 注意,对象属性必须是volatile类型的,只有这样才能保证可见性;如果对象属性不是volatile 类型的,newUpdater()方法会抛出IllegalArgumentException这个运行时异常。

    • 这里newUpdater()的方法参数只有类的信息,没有对象的引用,而更新对象的属性,一定需要对象的引用,那这个参数是在哪里传入的呢?
      -- 是在原子操作的方法参数中传入的。
      例如compareAndSet()这个原子操作,相比原子化的基本数据类型多了一个对象引用obj。原子化对象属性更新器相关的方法,相比原子化的基本数据类型仅仅是多了对象引用参数,不再赘述。
      1. boolean compareAndSet(T obj,
      2. int expect,
      3. int update)
  6. 分类五:原子化的累加器?

    • DoubleAccumulator、DoubleAdder、LongAccumulator和LongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持compareAndSet() 方法。
    • 如果你仅仅需要累加操作,使用原子化的累加器性能会更好。
使用
  1. 累加器的例子?

    • add10K()这个方法不是线程安全的
    • 问题:变量count的可见性和count+=1的原子性上。
    • 解决:可见性问题可以用volatile,而原子性问题前面一直采用的互斥锁方案。
      1. public class Test {
      2. long count = 0;
      3. void add10K() {
      4. int idx = 0;
      5. while(idx++ < 10000){
      6. count += 1;
      7. }
      8. }
      9. }
  2. 原子性问题的其他解法?

    • 其实对于简单的原子性问题,还有一种无锁方案。JUC将这种无锁方案封装提炼,实现了一系列的原子类。
    • 无锁方案相对互斥锁方案,最大的好处就是性能
    • why?
      互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性,既解决了问题,又没有带来新的问题,可谓绝佳方案。
  3. 利用原子类解决累加器问题?
    如下,两处简单的改动就能使add10K()方法变成线程安全的。

    1. public class Test {
    2. // ① long型变量count替换为了原子类AtomicLong
    3. AtomicLong count = new AtomicLong(0);
    4. void add10K() {
    5. int idx = 0;
    6. while(idx++ < 10000) {
    7. // ② count+=1替换成了count.getAndIncrement()
    8. count.getAndIncrement();
    9. }
    10. }
    11. }
原理
  1. 无锁方案的实现原理?

    • 其实原子类性能高的秘密很简单,硬件支持而已。
    • CPU 为了解决并发问题,提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)。
    • CAS 指令包含3个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B时,才能将内存中地址 A 处的值更新为新值 C。
    • 作为一条 CPU 指令,CAS 指令本身是能够保证原子性的。
  2. 通过CAS指令的模拟代码来理解CAS的工作原理?

    • 代码中两个参数,一个是期望值expect,另一个是需要写入的新值newValue,只有当目前 count 的值和期望值expect 相等时,才会将 count 更新为 newValue。
      1. class SimulatedCAS{
      2. int count
      3. synchronized int cas(int expect, int newValue){
      4. // 读目前 count 的值
      5. int curValue = count;
      6. // 比较目前 count 值是否 == 期望值
      7. if(curValue == expect){
      8. // 如果是,则更新 count 的值
      9. count = newValue;
      10. }
      11. // 返回写⼊前的值
      12. return curValue;
      13. }
      14. }
  3. 累加器中的cas语义?

    • 对于累加器的例子,count += 1的一个核心问题是:基于内存中 count 的当前值A计算出来的count+=1 为 A+1,在将 A+1 写入内存的时候,很可能此时内存中 count已经被其他线程更新过了,这样就会导致错误地覆盖其他线程写入的值。
    • 即,只有当内存中count的值等于期望值A时,才能将内存中count的值更新为计算结果 A+1,这不就是CAS的语义吗!
  4. 实现线程安全的累加器-CAS+自旋方案?

    • 使用CAS来解决并发问题,一般都会伴随着自旋,而所谓自旋,其实就是循环尝试。
    • eg,实现一个线程安全的count+=1操作,“CAS+自旋”的实现方案如下,
      -- 首先计算newValue=count+1,如果cas(count,newValue)返回的值不等于count,则意味着线程在执行完代码①处之后,执行代码②处之前,count的值被其他线程更新过。
      -- 那此时该怎么处理呢?可以采用自旋方案,如下代码,可以重新读count最新的值来计算newValue并尝试再次更新,直到成功。
      1. class SimulatedCAS{
      2. volatile int count;
      3. // 实现count+=1
      4. addOne() {
      5. do {
      6. newValue = count+1; //①
      7. }while(count != cas(count,newValue) //②
      8. }
      9. // 模拟实现CAS,仅用来帮助理解
      10. synchronized int cas(int expect, int newValue){
      11. //读目前count的值
      12. int curValue = count;
      13. //比较目前count值是否==期望值
      14. if(curValue == expect){
      15. //如果是,则更新count的值
      16. count = newValue;
      17. }
      18. // 返回写当前的值
      19. return curValue;
      20. }
      21. }
  5. CAS无锁方案的优缺点?

    • 优点:
      CAS这种无锁方案,完全没有加锁、解锁操作,即便两个线程完全同时执行 addOne() 方法,也不会有线程被阻塞,所以相对于互斥锁方案来说,性能好了很多。
    • 缺点:ABA问题
      -- 前面提到“如果cas(count,newValue)返回的值不等于count,意味着线程在执行完代码①处之后,执行代码②处之前,count的值被其他线程更新过”
      -- 如果cas(count,newValue)返回的值等于count的一种情况:
      假设count原本是A,线程T1在执行完代码①处之后,执行代码②处之前,有可能count被线程T2更新成了B,之后又被T3更新回了A,这样线程T1虽然看到的一直是A,但是其实已经被其他线程更新过了,这就是 ABA 问题。
  6. 什么情况下关注ABA问题?

    • 大多数情况下并不关心ABA问题,例如数值的原子递增;
    • 但原子化的更新对象很可能就需要关心ABA问题。
      因为两个A虽然相等,但是第二个A的属性可能已经发生变化了。所以在使用CAS 方案的时候,一定要先check一下。
  7. Java 如何实现原子化的count += 1?

    • 以下使用原子类AtomicLong的getAndIncrement()方法替代了count+=1,从而实现了线程安全。
    • 原子类 AtomicLong 的 getAndIncrement() 方法内部就是基于 CAS 实现的;
    • Java 是如何使用 CAS 来实现原子化的count += 1的?
      -- 在Java1.8中,getAndIncrement() 方法会转调 unsafe.getAndAddLong()方法;
      -- 这里 this 和valueOffset 两个参数可以唯一确定共享变量的内存地址。

      1. final long getAndIncrement() {
      2. return unsafe.getAndAddLong(this, valueOffset, 1L);
      3. }
    • unsafe.getAndAddLong()方法的源码如下
      -- 该方法首先会在内存中读取共享变量的值,之后循环调用 compareAndSwapLong() 方法来尝试设置共享变量的值,直到成功为止。
      -- compareAndSwapLong() 是一个 native 方法,只有当内存中共享变量的值等于 expected 时,才会将共享变量的值更新为 x,并且返回 true;否则返回fasle。
      -- compareAndSwapLong 的语义和CAS 指令的语义的差别仅仅是返回值不同而已。

      1. public final long getAndAddLong(Object o, long offset, long delta){
      2. long v;
      3. do {
      4. // 读取内存中的值
      5. v = getLongVolatile(o, offset);
      6. } while (!compareAndSwapLong(o, offset, v, v + delta));
      7. return v;
      8. }
      9. // 原子性地将变量更新为x的条件是内存中的值等于 expected
      10. // 更新成功则返回 true
      11. native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
  8. CAS应用的抽象代码逻辑?

    • 注意:
      getAndAddLong() 方法的实现,基本上就是 CAS 使用的经典范例。
    • 下面这段抽象后的代码片段,在很多无锁程序中经常出现。
    • Java 提供的原子类里面 CAS 一般被实现为 compareAndSet(),compareAndSet() 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已,compareAndSet()里面如果更新成功,则会返回 true,否则返回 false。
      1. do {
      2. // 获取当前值
      3. oldV = xxxx
      4. // 根据当前值计算新值
      5. newV = ...oldV...
      6. }while(!compareAndSet(oldV,newV);
  9. 思考题:隐藏的while(true)问题?

    • 下面的示例代码是合理库存的原子化实现,仅实现了设置库存上限setUpper()方法,你觉得setUpper()方法的实现是否正确呢?

      1. public class SafeWM {
      2. class WMRange{
      3. final int upper;
      4. final int lower;
      5. WMRange(int upper,int lower){
      6. // 省略构造函数实现
      7. }
      8. }
      9. final AtomicReference<WMRange>
      10. rf = new AtomicReference<>(
      11. new WMRange(0,0)
      12. );
      13. // 设置库存上限
      14. void setUpper(int v){
      15. WMRange nr;
      16. WMRange or = rf.get();
      17. do{
      18. // 检查参数合法性
      19. if(v < or.lower){
      20. throw new IllegalArgumentException();
      21. }
      22. nr = new
      23. WMRange(v, or.lower);
      24. }while(!rf.compareAndSet(or, nr));
      25. }
      26. }
    • 解题:
      看上去 while(!rf.compareAndSet(or, nr))是有终止条件的,而且跑单线程测试一直都没有问题。实际上却存在严重的并发问题,问题就出在对or的赋值在while循环之外,这样每次循环or的值都不会发生变化,所以一旦有一次循环rf.compareAndSet(or,nr)的值等于false,那之后无论循环多少次,都会等于false。也就是说在特定场景下,变成了while(true)问题。既然找到了原因,修改就很简单了,只要把对or的赋值移到while循环之内就可以了,修改后的代码如下所示:

  1. public class SafeWM {
  2. class WMRange{
  3. final int upper;
  4. final int lower;
  5. WMRange(int upper,int lower){
  6. //省略构造函数实现
  7. }
  8. }
  9. final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0,0));
  10. // 设置库存上限
  11. void setUpper(int v){
  12. WMRange nr;
  13. WMRange or;
  14. //原代码在这⾥
  15. //WMRange or=rf.get();
  16. do{
  17. //移动到此处
  18. //每个回合都需要重新获取旧值
  19. or = rf.get();
  20. // 检查参数合法性
  21. if(v < or.lower){
  22. throw new IllegalArgumentException();
  23. }
  24. nr = new WMRange(v, or.lower);
  25. }while(!rf.compareAndSet(or, nr));
  26. }
  27. }

8-线程池

1-Executor与线程池:如何创建正确的线程池?
  1. 为什么使用线程池?

    • 创建线程问题:
      Java语言中创建线程看上去就像创建一个对象一样简单,只需要new Thread()就可以了,实际上创建线程远不是创建一个对象那么简单。
      创建对象,仅仅是在JVM的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
    • 那如何避免频繁创建和销毁线程?
      -- 线程池。
  2. 一般意义上的池化资源?

    • 一般意义上的池化资源,如下:
    • 当你需要资源的时候就调用acquire()方法来申请资源,用完之后就调用release()释放资源。若你带着这个固有模型来看并发包里线程池相关的工具类时,会很遗憾地发现它们完全匹配不上,Java提供的线程池里面压根就没有申请线程和释放线程的方法。
      1. class XXXPool{
      2. // 获取池化资源
      3. XXX acquire(){
      4. }
      5. // 释放池化资源
      6. void release(XXX x){
      7. }
      8. }
  3. 为什么线程池没有采用一般意义上池化资源的设计方法呢?

    • 如果线程池采用一般意义上池化资源的设计方法,应该是下面示例代码这样。你可以来思考一下,假设我们获取到一个空闲线程T1,然后该如何使用T1呢?你期望的可能是这样:通过调用T1的execute()方法,传入一个Runnable对象来执行具体业务逻辑,就像通过构造函数Thread(Runnable target)创建线程一样。可惜的是,你翻遍Thread对象的所有方法,都不存在类似execute(Runnable target)这样的公共方法。

      1. //采一般般意义上池化资源的设计方法
      2. class ThreadPool{
      3. // 获取空闲线程
      4. Thread acquire() {
      5. }
      6. // 释放线程
      7. void release(Thread t){
      8. }
      9. }
      10. //期望的使用
      11. ThreadPool pool
      12. Thread T1=pool.acquire();
      13. //传入Runnable对象
      14. T1.execute(()->{
      15. //具体业务逻辑
      16. ......
      17. });
    • 所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法。

  4. 线程池如何设计呢?

    • 目前业界线程池的设计,普遍采用的都是生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者。
    • 线程池和普通的池化资源有很大不同,线程池实际上是生产者-消费者模式的一种实现。
  5. 通过创建简单的线程池MyThreadPool,理解线程池的工作原理?

    • 在MyThreadPool的内部,我们维护了一个阻塞队列workQueue和一组工作线程,工作线程的个数由构造函数中的poolSize来指定。用户通过调用execute()方法来提交Runnable任务,execute()方法的内部实现仅仅是将任务加入到workQueue中。MyThreadPool内部维护的工作线程会消费workQueue中的任务并执行任务,相关的代码就是代码①处的while循环。线程池主要的工作原理就这些,是不是还挺简单的
      1. //简化的线程池,仅用来说明工作原理
      2. class MyThreadPool{
      3. //利用阻塞队列实现生产者-消费者模式
      4. BlockingQueue<Runnable> workQueue;
      5. //保存内部工作线程
      6. List<WorkerThread> threads = new ArrayList<>();
      7. // 构造方法
      8. MyThreadPool(int poolSize,BlockingQueue<Runnable> workQueue){
      9. this.workQueue = workQueue;
      10. // 创建工作线程
      11. for(int idx=0; idx<poolSize; idx++){
      12. WorkerThread work = new WorkerThread();
      13. work.start();
      14. threads.add(work);
      15. }
      16. }
      17. // 提交任务
      18. void execute(Runnable command){
      19. workQueue.put(command);
      20. }
      21. // 工作线程负责消费任务,并执行任务
      22. class WorkerThread extends Thread{
      23. public void run() {
      24. //循环取任务并执行
      25. while(true){
      26. Runnable task = workQueue.take();
      27. task.run();
      28. }
      29. }
      30. }
      31. }
      32. /** 下面是使用示例 **/
      33. // 创建有界阻塞队列
      34. BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
      35. // 创建线程池
      36. MyThreadPool pool = new MyThreadPool(10, workQueue);
      37. // 提交任务
      38. pool.execute(()->{
      39. System.out.println("hello");
      40. });
  6. 如何使用Java中的线程池?

    • JUC提供的线程池,远比我们上面的示例代码强大得多,当然也复杂得多。
    • Java提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor,通过名字你也能看出来,它强调的是Executor,而不是一般意义上的池化资源。
    • ThreadPoolExecutor的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有7个参数。
      1. ThreadPoolExecutor(
      2. int corePoolSize,
      3. int maximumPoolSize,
      4. long keepAliveTime,
      5. TimeUnit unit,
      6. BlockingQueue<Runnable> workQueue,
      7. ThreadFactory threadFactory,
      8. RejectedExecutionHandler handler)
  7. 线程池的参数介绍?

    • 可以把线程池类比为一个项目组,而线程就是项目组的成员。
    • corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留corePoolSize个人坚守阵地。
    • maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到maximumPoolSize个人。当项目闲下来时,就要撤人了,最多能撤到corePoolSize个人。
    • keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?
      很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和unit就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime&unit这么久,而且线程池的线程数大于corePoolSize,那么这个空闲的线程就要被回收了。
    • workQueue:工作队列,和上面示例代码的工作队列同义。
    • threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
    • handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过handler这个参数来指定。ThreadPoolExecutor已经提供了以下4种策略。
      -- CallerRunsPolicy:提交任务的线程自己去执行该任务。
      -- AbortPolicy:默认的拒绝策略,会throws RejectedExecutionException。
      -- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
      -- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
    • Java1.6还增加了allowCoreThreadTimeOut(boolean value)方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。
  8. 使用线程池要注意些什么?

    • 1)不建议使用Executors创建线程池。
      考虑到ThreadPoolExecutor的构造函数实在是有些复杂,所以JUC提供了一个线程池的静态工厂类Executors,利用Executors你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用Executors了,所以这里我就不再花篇幅介绍了。
    • 2)使用有界队列。
      不建议使用Executors的最重要的原因是:Executors提供的很多方法默认使用的都是无界的LinkedBlockingQueue,高负载情境下,无界队列很容易导致OOM,而OOM会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列
    • 3)默认拒绝策略要慎重使用。
      使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会throw RejectedExecutionException这是个运行时异常,对于运行时异常编译器并不强制catch它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
    • 4)注意异常处理的问题。
      例如通过ThreadPoolExecutor对象的execute()方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。
      1. try {
      2. //业务逻辑
      3. } catch (RuntimeException x) {
      4. //按需处理
      5. } catch (Throwable x) {
      6. //按需处理
      7. }
  9. 课后思考:
    使用线程池,默认情况下创建的线程名字都类似pool-1-thread-2这样,没有业务含义。而很多情况下为了便于诊断问题,都需要给线程赋予一个有意义的名字,那你知道有哪些办法可以给线程池里的线程指定名字吗?

Future-使用-如何用多线程实现最优的“烧水泡茶”程序?
  1. 问题:
    在上一篇文章介绍了如何创建正确的线程池,那创建完线程池,我们该如何使用呢?

  2. ThreadPoolExecutor提交和获取任务结果?

    • 提交任务:
      -- ThreadPoolExecutor的void execute(Runnable command)方法;
      -- 此方法可以提交任务,但是却没有办法获取任务的执行结果(execute()方法没有返回值)。
    • 获得任务执行结果:
      -- Java通过ThreadPoolExecutor提供的3个submit()方法和1个FutureTask工具类来支持获得任务执行结果的需求。
      -- 这3个submit()方法的方法签名如下:
      1. // 提交Runnable任务
      2. Future<?> submit(Runnable task);
      3. // 提交Callable任务
      4. <T> Future<T> submit(Callable<T> task);
      5. // 提交Runnable任务及结果引⽤
      6. <T> Future<T> submit(Runnable task, T result);
  3. Future接口的5个方法?

    • 上述3个submit()方法的返回值都是Future接口;
    • Future接口有5个方法,取消任务的方法cancel()、判断任务是否已取消的方法isCancelled()、判断任务是否已结束的方法isDone()以及2个获得任务执行结果的get()和get(timeout,unit),其中最后一个get(timeout, unit)支持超时机制。
    • 通过Future接口的这5个方法你会发现,提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个get()方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用get()方法的线程会阻塞,直到任务执行完才会被唤醒。
      1. // 取消任务
      2. boolean cancel(boolean mayInterruptIfRunning);
      3. // 判断任务是否已取消
      4. boolean isCancelled();
      5. // 判断任务是否已结束
      6. boolean isDone();
      7. // 获得任务执行结果
      8. get();
      9. // 获得任务执行结果,支持超时
      10. get(long timeout,TimeUnit unit);
  4. 3个submit()方法之间的区别在于方法参数不同,分别介绍?

    • 3个submit()方法
      提交Runnable任务 submit(Runnable task):
      这个方法的参数是一个Runnable接口,Runnable接口的run()方法是没有返回值的,所以 submit(Runnable task)这个方法返回的Future仅可以用来断言任务已经结束了,类似于Thread.join()。
      提交Callable任务 submit(Callable task)
      这个方法的参数是一个Callable接口,它只有一个call()方法,并且这个方法是有返回值的,所以这个方法返回的Future对象可以通过调用其get()方法来获取任务的执行结果。
      提交Runnable任务及结果引用submit(Runnable task, T result)
      这个方法很有意思,假设这个方法返回的Future对象是f,f.get()的返回值就是传给submit()方法的参数result。
  5. 方法③submit(Runnable task, T result)的用法?

    • 经典用法代码展示如下:
    • 注意:
      Runnable接口的实现类Task声明了一个有参构造函数Task(Result r),创建Task对象的时候传入了result对象,这样就能在类Task的run()方法中对result进行各种操作了。result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
      1. ExecutorService executor = Executors.newFixedThreadPool(1);
      2. // 创建Result对象r
      3. Result r = new Result();
      4. r.setAAA(a);
      5. // 提交任务
      6. Future<Result> future = executor.submit(new Task(r), r);
      7. Result fr = future.get();
      8. // 下面等式成立
      9. fr === r;
      10. fr.getAAA() === a;
      11. fr.getXXX() === x
      12. class Task implements Runnable{
      13. Result r;
      14. //通过构造函数传入result
      15. Task(Result r){
      16. this.r = r;
      17. }
      18. void run() {
      19. //可以操作result
      20. a = r.getAAA();
      21. r.setXXX(x);
      22. }
      23. }
FutureTask
  1. 介绍FutureTask工具类?

    • Future是一个接口,而FutureTask是一个实实在在的工具类;
    • FutureTask有两个构造函数,参数和前面介绍的submit()方法类似,不再赘述。
      1. FutureTask(Callable<V> callable);
      2. FutureTask(Runnable runnable, V result);
  2. 如何使用FutureTask?

    • FutureTask实现了Runnable和Future接口
      ① 实现了Runnable接口,可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread执行;
      ② 实现了Future接口,所以也能用来获得任务的执行结果。
    • 示例代码①: 将FutureTask对象提交给ThreadPoolExecutor去执行。

      1. // 创建FutureTask
      2. FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
      3. // 创建线程池
      4. ExecutorService es = Executors.newCachedThreadPool();
      5. // 提交FutureTask
      6. es.submit(futureTask);
      7. // 获取计算结果
      8. Integer result = futureTask.get();
    • 示例代码②:FutureTask对象直接被Thread执行的示例代码如下所示。
      可以看出:利用FutureTask对象可以很容易获取子线程的执行结果。

      1. // 创建FutureTask
      2. FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
      3. // 创建并启动线程
      4. Thread T1 = new Thread(futureTask);
      5. T1.start();
      6. // 获取计算结果
      7. Integer result = futureTask.get();
  3. 实现最优的“烧水泡茶”程序?

    • 数学家华罗庚先生的文章《统筹方法》,这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:
      在这里插入图片描述
    • 用程序来模拟一下这个最优工序。前面曾经提到,并发编程可以总结为三个核心问题:分工、同步和互斥。
    • 编写并发程序,首先要做的就是分工.
      所谓分工指的是如何高效地拆解任务并分配给线程。对于烧水泡茶这个程序,一种最优的分工方案可以是下图所示的这样:用两个线程T1和T2来完成烧水泡茶程序,T1负责洗水壶、烧开水、泡茶这三道工序,T2负责洗茶壶、洗茶杯、拿茶叶三道工序,其中T1
      在执行泡茶这道工序时需要等待T2完成拿茶叶的工序。对于T1的这个等待动作,可以想出很多种办法,例如Thread.join()、CountDownLatch,甚至阻塞队列都可以解决,不过今天我们用Future特性来实现。
      在这里插入图片描述
    • 下面的示例代码就是用这一章提到的Future特性来实现的。
      首先,我们创建了两个FutureTask——ft1和ft2,ft1完成洗水壶、烧开水、泡茶的任务,ft2完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是ft1这个任务在执行泡茶任务前,需要等待ft2把茶叶拿来,所以ft1内部需要引用ft2,并在执行泡茶之前,调用
      ft2的get()方法实现等待。
      1. // 创建任务T2的FutureTask
      2. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
      3. // 创建任务T1的FutureTask
      4. FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
      5. // 线程T1执行任务ft1
      6. Thread T1 = new Thread(ft1);
      7. T1.start();
      8. // 线程T2执行任务ft2
      9. Thread T2 = new Thread(ft2);
      10. T2.start();
      11. // 等待线程T1执行结果
      12. System.out.println(ft1.get());
      13. // T1Task需要执行的任务:
      14. // 洗水壶、烧开水、泡茶
      15. class T1Task implements Callable<String>{
      16. FutureTask<String> ft2;
      17. // T1任务需要T2任务的FutureTask
      18. T1Task(FutureTask<String> ft2){
      19. this.ft2 = ft2;
      20. }
      21. @Override
      22. String call() throws Exception {
      23. System.out.println("T1:洗水壶...");
      24. TimeUnit.SECONDS.sleep(1);
      25. System.out.println("T1:烧开水...");
      26. TimeUnit.SECONDS.sleep(15);
      27. String tf = ft2.get(); // ★ 获取T2线程的茶叶
      28. System.out.println("T1:拿到茶叶:"+tf);
      29. System.out.println("T1:泡茶...");
      30. return "上茶:" + tf;
      31. }
      32. }
      33. // T2Task需要执行的任务:
      34. // 洗茶壶、洗茶杯、拿茶叶
      35. class T2Task implements Callable<String>{
      36. @Override
      37. String call() throws Exception{
      38. System.out.println("T2:洗茶壶...");
      39. TimeUnit.SECONDS.sleep(1);
      40. System.out.println("T2:洗茶杯...");
      41. TimeUnit.SECONDS.sleep(2);
      42. System.out.println("T2:拿茶叶...");
      43. TimeUnit.SECONDS.sleep(1);
      44. return "龙井";
      45. }
      46. }
      47. // 一次执行结果:
      48. T1:洗水壶...
      49. T2:洗茶壶...
      50. T1:烧开水...
      51. T2:洗茶杯...
      52. T2:拿茶叶...
      53. T1:拿到茶叶:龙井
      54. T1:泡茶...
      55. 上茶:龙井
  4. 总结

    • 利用JUC提供的Future可以很容易获得异步任务的执行结果,无论异步任务是通过线程池ThreadPoolExecutor执行的,还是通过手工创建子线程来执行的。
    • Future可以类比为现实世界里的提货单,比如去蛋糕店订生日蛋糕,蛋糕店都是先给你一张提货单,你拿到提货单之后,没有必要一直在店里等着,可以先去干点其他事,比如看场电影;等看完电影后,基本上蛋糕也做好了,然后你就可以凭提货单领蛋糕
      了。
    • 利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用Future来解决。
    • 在分析这种问题的过程中,建议你用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好,类似于烧水泡茶最优分工方案那幅图。对照图来写代码,好处是更形象,且不易出错。
  5. 课后思考
    不久前听说小明要做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里。核心示例代码如下所示,由于是串行的,所以性能很慢,你来试着优化一下吧。

  1. // 向电商S1询价,并保存
  2. r1 = getPriceByS1();
  3. save(r1);
  4. // 向电商S2询价,并保存
  5. r2 = getPriceByS2();
  6. // 向电商S3询价,并保存
  7. r3 = getPriceByS3();

9-异步编程

  1. 异步代码?

    • 现象:
      用多线程优化性能,其实不过就是将串行操作变成并行操作。可以看到,在串行转换成并行的过程中,一定会涉及到异步化。
    • 例子:
      如下代码,是串行的,为了提升性能,需要并行化,那具体实施起来该怎么做呢?

      1. //以下两个方法都是耗时操作
      2. doBizA();
      3. doBizB()
    • 实现:
      如下代码,创建两个子线程去执行。如下并行方案,主线程无需等待doBizA()和doBizB()的执行结果,即doBizA()和doBizB()两个操作已经被异步化。

      1. new Thread(()->doBizA())
      2. .start();
      3. new Thread(()->doBizB())
      4. .start();
  2. 异步化?
    异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。看到这里,相信你应该就能理解异步编程最近几年为什么会大火了,因为优化性能是互联网大厂的一个核心需求啊。Java1.8提供了CompletableFuture来支持异步编程,CompletableFuture有可能是你见过的最复杂的工具类了,不过功能也着实让人感到震撼。

  3. CompletableFuture的核心优势?
    1)无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
    2)语义更清晰。
    例如下代码中f3=f1.thenCombine(f2,()->{})能够清晰地表述“任务3要等待任务1和任务2都完成后才能开始”;
    3)代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

  4. 使用CompletableFuture重新实现前面曾提及的烧水泡茶程序?
    首先还是需要先完成分工方案,在下面的程序中,我们分了3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。这个分工如下图所示。
    在这里插入图片描述

    • 下面是代码实现,你先略过runAsync()、supplyAsync()、thenCombine()这些不太熟悉的方法:
      1. //任务1:洗水壶->烧开水
      2. CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
      3. System.out.println("T1:洗水壶...");
      4. sleep(1, TimeUnit.SECONDS);
      5. System.out.println("T1:烧开水...");
      6. sleep(15, TimeUnit.SECONDS);
      7. });
      8. //任务2:洗茶壶->洗茶杯->拿茶叶
      9. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
      10. System.out.println("T2:洗茶壶...");
      11. sleep(1, TimeUnit.SECONDS);
      12. System.out.println("T2:洗茶杯...");
      13. sleep(2, TimeUnit.SECONDS);
      14. System.out.println("T2:拿茶叶...");
      15. sleep(1, TimeUnit.SECONDS);
      16. return "龙井";
      17. });
      18. //任务3:任务1和任务2完成后执⾏:泡茶
      19. CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{
      20. System.out.println("T1:拿到茶叶:" + tf);
      21. System.out.println("T1:泡茶...");
      22. return "上茶:" + tf;
      23. });
      24. //等待任务3执行结果
      25. System.out.println(f3.join());
      26. void sleep(int t,TimeUnit u){
      27. try {
      28. u.sleep(t);
      29. }catch(InterruptedException e){}
      30. }
      31. // 一次执行结果:
      32. T1:洗水壶...
      33. T2:洗茶壶...
      34. T1:烧开水...
      35. T2:洗茶杯...
      36. T2:拿茶叶...
      37. T1:拿到茶叶:龙井
      38. T1:泡茶...
      39. 上茶:龙井
CompletableFuture的使用
  1. 介绍CompletableFuture的使用

  2. 如何创建CompletableFuture对象?

    • 创建CompletableFuture对象主要靠下面代码中展示的这4个静态方法:

      1. //使用默认线程池
      2. static CompletableFuture<Void> runAsync(Runnable runnable)
      3. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
      4. //可以指定线程池
      5. static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
      6. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
    • 前两个方法:
      -- 上面例子中使用了runAsync(Runnable runnable)和supplyAsync(Supplier supplier)
      -- 区别是:Runnable接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的。

    • 前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。
    • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
  3. 异步操作需要关注的两个问题?

    • 创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为CompletableFuture类实现了Future接口,所以这两个问题你都可以通过Future接口来解决。另外,CompletableFuture类还实现了CompletionStage接口,这个接口内容实在是太丰富了,在1.8版本里有40个方法,这些方法我们该如何理解呢?
  4. 如何理解CompletionStage接口?

    • 分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。
      -- 烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。
    • CompletionStage接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 =f1.thenCombine(f2, ()->{})描述的就是一种汇聚关系。
      -- 烧水泡茶程序中的汇聚关系是一种AND聚合关系,这里的AND指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。
      -- 既然有AND聚合关系,那就一定还有OR聚合关系,所谓OR指的是依赖的任务只要有一个完成就可以执行当前任务。
      -- 在编程领域,还有一个绕不过去的山头,那就是异常处理,CompletionStage接口也可以方便地描述异常处理。
      在这里插入图片描述
  5. CompletionStage接口如何描述串行关系、AND聚合关系、OR聚合关系以及异常处理?

  6. CompletionStage接口--描述串行关系?

    • CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四个系列的接口。
    • thenApply系列函数里参数fn的类型是接口Function,这个接口里与CompletionStage相关的方法是R apply(T t),这个方法既能接收参数也支持返回值,所以thenApply系列方法返回的是CompletionStage。
    • 而thenAccept系列方法里参数consumer的类型是接口Consumer,这个接口里与CompletionStage相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以thenAccept系列方法返回的是CompletionStage。
    • thenRun系列方法里action的参数是Runnable,所以action既不能接收参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage。
    • 这些方法里面Async代表的是异步执行fn、consumer或者action。其中,需要你注意的是thenCompose系列方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
      1. CompletionStage<R> thenApply(fn);
      2. CompletionStage<R> thenApplyAsync(fn);
      3. CompletionStage<Void> thenAccept(consumer);
      4. CompletionStage<Void> thenAcceptAsync(consumer);
      5. CompletionStage<Void> thenRun(action);
      6. CompletionStage<Void> thenRunAsync(action);
      7. CompletionStage<R> thenCompose(fn);
      8. CompletionStage<R> thenComposeAsync(fn);
  7. thenApply()方法是如何使用的?

    • 如下代码
      首先通过supplyAsync()启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。
      1. CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
      2. ()->"Hello World") //①
      3. .thenApply(s->s +"QQ") //②
      4. .thenApply(String::toUpperCase);//③
      5. System.out.println(f0.join());
      6. //输出结果
      7. HELLO WORLD QQ
  8. CompletionStage接口--描述AND汇聚关系?

    • CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。
    • 使用可以参考上面烧水泡茶的实现程序,这里就不赘述了。
      1. CompletionStage<R> thenCombine(other, fn);
      2. CompletionStage<R> thenCombineAsync(other, fn);
      3. CompletionStage<Void> thenAcceptBoth(other, consumer);
      4. CompletionStage<Void> thenAcceptBothAsync(other, consumer);
      5. CompletionStage<Void> runAfterBoth(other, action);
      6. CompletionStage<Void> runAfterBothAsync(other, action);
  9. CompletionStage接口--描述OR汇聚关系?

    • CompletionStage接口里面描述OR汇聚关系,主要是applyToEither、acceptEither和runAfterEither系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。
      1. CompletionStage applyToEither(other, fn);
      2. CompletionStage applyToEitherAsync(other, fn);
      3. CompletionStage acceptEither(other, consumer);
      4. CompletionStage acceptEitherAsync(other, consumer);
      5. CompletionStage runAfterEither(other, action);
      6. CompletionStage runAfterEitherAsync(other, action);
  10. 如何使用applyToEither()方法来描述一个OR汇聚关系?

    • 代码如下
      1. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
      2. int t = getRandom(5, 10);
      3. sleep(t, TimeUnit.SECONDS);
      4. return String.valueOf(t);
      5. });
      6. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
      7. int t = getRandom(5, 10);
      8. sleep(t, TimeUnit.SECONDS);
      9. return String.valueOf(t);
      10. });
      11. CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s);
      12. System.out.println(f3.join());
  11. CompletionStage接口--异常处理?

    • 虽然上面我们提到的fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行7/0就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

      1. CompletableFuture<Integer> f0 = CompletableFuture.
      2. .supplyAsync(()->(7/0))
      3. .thenApply(r->r*10);
      4. System.out.println(f0.join());
    • CompletionStage接口给我们提供的方案非常简单,比try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

      1. CompletionStage exceptionally(fn);
      2. CompletionStage<R> whenComplete(consumer);
      3. CompletionStage<R> whenCompleteAsync(consumer);
      4. CompletionStage<R> handle(fn);
      5. CompletionStage<R> handleAsync(fn);
  12. 如何使用exceptionally()方法来处理异常?

    • exceptionally()的使用非常类似于try{}catch{}中的catch{},但是由于支持链式编程方式,所以相对更简单。
    • 既然有try{}catch{},那就一定还有try{}finally{},whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{},无论是否发生异常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn。
    • whenComplete()和handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。
      1. CompletableFuture<Integer> f0 = CompletableFuture
      2. .supplyAsync(()->7/0))
      3. .thenApply(r->r*10)
      4. .exceptionally(e->0);
      5. System.out.println(f0.join())
  13. 总结

    • 曾经一提到异步编程,大家脑海里都会随之浮现回调函数,例如在JavaScript里面异步问题基本上都是靠回调函数来解决的,回调函数在处理异常以及复杂的异步任务关系时往往力不从心,对此业界还发明了个名词:回调地狱(Callback Hell)。
    • 应该说在前些年,异步编程还是声名狼藉的。不过最近几年,伴随着ReactiveX的发展(Java语言的实现版本是RxJava),回调地狱已经被完美解决了,异步编程已经慢慢开始成熟,Java语言也开始官方支持异步编程:在1.8版本提供了CompletableFuture,在
      Java 9版本则提供了更加完备的Flow API,异步编程目前已经完全工业化。因此,学好异步编程还是很有必要的。
    • CompletableFuture已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注RxJava这个项目,利用RxJava,即便在Java1.6版本也能享受异步编程的乐趣。
  14. 思考题

    • 问题:
      创建采购订单的时候,需要校验一些规则,例如最大金额是和采购员级别相关的。有同学利用CompletableFuture实现了这个校验的功能,逻辑很简单,首先是从数据库中把相关规则查出来,然后执行规则校验。你觉得他的实现是否有问题呢?

      1. //采购订单
      2. PurchersOrder po;
      3. CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{
      4. //在数据库中查询规则
      5. return findRuleByJdbc();
      6. }).thenApply(r -> {
      7. //规则校验
      8. return check(po, r);
      9. });
      10. Boolean isOk = cf.join();
    • 这段代码的问题,例如没有异常处理、逻辑不严谨等等,不过我更想让你关注的是:findRuleByJdbc()这个方法隐藏着一个阻塞式I/O,这意味着会阻塞调用线程。默认情况下所有的CompletableFuture共享一个ForkJoinPool,当有阻塞式I/O时,可能导致所有的ForkJoinPool线程都阻塞,进而影响整个系统的性能。

      1. //采购订单
      2. PurchersOrder po;
      3. CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{
      4. //在数据库中查询规则
      5. return findRuleByJdbc();
      6. }).thenApply(r ->{
      7. //规则校验
      8. return check(po,r);
      9. });
      10. Boolean isOk = cf.join();

10-CompletionService--批量异步任务

  1. 如何优化一个询价应用的核心代码?

    • 实际项目中,并不建议如下做法,因为JUC已经提供了设计精良的CompletionService。
    • 如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用Future的get()方法获取询价结果,之后将询价结果保存在数据库中。

      1. // 创建线程池
      2. ExecutorService executor = Executors.newFixedThreadPool(3);
      3. // 异步向电商S1询价
      4. Future<Integer> f1 = executor.submit(()->getPriceByS1());
      5. // 异步向电商S2询价
      6. Future<Integer> f2 = executor.submit(()->getPriceByS2());
      7. // 异步向电商S3询价
      8. Future<Integer> f3 = executor.submit(()->getPriceByS3());
      9. // 获取电商S1报价并异步保存
      10. executor.execute(()->save(f1.get()));
      11. // 获取电商S2报价并异步保存
      12. executor.execute(()->save(f2.get())
      13. // 获取电商S3报价并异步保存
      14. executor.execute(()->save(f3.get())
    • 上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。这点小瑕疵你该如何解决呢?

    • 估计你已经想到了,增加一个阻塞队列,获取到S1、S2、S3的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。
      1. // 创建阻塞队列
      2. BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
      3. //电商S1报价异步进入阻塞队列
      4. executor.execute(()->bq.put(f1.get()));
      5. //电商S2报价异步进入阻塞队列
      6. executor.execute(()->bq.put(f2.get()));
      7. //电商S3报价异步进入阻塞队列
      8. executor.execute(()->bq.put(f3.get()));
      9. //异步保存所有报价
      10. for (int i=0; i<3; i++) {
      11. Integer r = bq.take();
      12. executor.execute(()->save(r));
      13. }
  2. 利用CompletionService实现询价系统?

    • 实际项目中,JUC提供了设计精良的CompletionService。利用CompletionService不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。
    • CompletionService的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是CompletionService是把任务执行结果的Future对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
  3. 如何创建CompletionService呢?

    • CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个,分别是:

      1. 1. ExecutorCompletionService(Executor executor);
      2. 2. ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>>
      3. completionQueue)。
    • 下面的示例代码完整地展示了如何利用CompletionService来实现高性能的询价系统。
      -- 其中,没有指定completionQueue,因此默认使用无界的LinkedBlockingQueue。
      -- 之后通过CompletionService接口提供的submit()方法提交了三个询价操作,这三个询价操作将会被CompletionService异步执行。
      -- 最后,我们通过CompletionService接口提供的take()方法获取一个Future对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的Future对象),调用Future对象的get()方法就能返回询价操作的执行结果了。

      1. // 创建线程池
      2. ExecutorService executor = Executors.newFixedThreadPool(3);
      3. // 创建CompletionService
      4. CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
      5. // 异步向电商S1询价
      6. cs.submit(()->getPriceByS1());
      7. // 异步向电商S2询价
      8. cs.submit(()->getPriceByS2());
      9. // 异步向电商S3询价
      10. cs.submit(()->getPriceByS3());
      11. // 将询价结果异步保存到数据库
      12. for (int i=0; i<3; i++) {
      13. Integer r = cs.take().get();
      14. executor.execute(()->save(r));
      15. }
  4. CompletionService接口说明?

    • 介绍一下CompletionService接口提供的方法,CompletionService接口提供的方法有5个,这5个方法的方法签名如下所示。

      1. Future<V> submit(Callable<V> task);
      2. Future<V> submit(Runnable task, V result);
      3. Future<V> take() throws InterruptedException;
      4. Future<V> poll();
      5. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
    • 其中,submit()相关的方法有两个。
      ① 一个方法参数是Callable task,前面利用CompletionService实现询价系统的示例代码中,我们提交任务就是用的它。
      ② 另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于ThreadPoolExecutor的 Future submit(Runnable task,T result) ,不再赘述。

    • CompletionService接口其余的3个方法,都是和阻塞队列相关的,take()、poll()都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用 take()方法的线程会被阻塞,而poll()方法会返回null值。poll(long timeout,TimeUnit unit)方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了timeout unit时间,阻塞队列还是空的,那么该方法会返回null值。
  5. 利用CompletionService实现Dubbo中的Forking Cluster?

    • Dubbo中有一种叫做Forking的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,你可以并行地调用3个地图服务商的API,然后只要有1个正确返回了结果r,那么地址转坐标这个服务就可以直接返回r了。这种集群模式可以容忍2个地图服务商服务异常,但缺点是消耗的资源偏多。
      1. geocoder(addr) {
      2. //并行执行以下3个查询服务,
      3. r1=geocoderByS1(addr);
      4. r2=geocoderByS2(addr);
      5. r3=geocoderByS3(addr);
      6. //只要r1,r2,r3有一个返回,则返回
      7. return r1|r2|r3;
      8. }
  6. 具体是如何实现Forking这种集群模式?

    • 利用CompletionService可以快速实现Forking这种集群模式,比如下面的示例代码就展示了具体是如何实现的。
    • 首先我们创建了一个线程池executor、一个CompletionService对象cs和一个Future类型的列表futures,每次通过调用CompletionService的submit()方法提交一个异步任务,会返回一个Future对象,我们把这些Future对象保存在列表futures中。通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。
      1. // 创建线程池
      2. ExecutorService executor = Executors.newFixedThreadPool(3);
      3. // 创建CompletionService
      4. CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
      5. // 用于保存Future对象
      6. List<Future<Integer>> futures = new ArrayList<>(3);
      7. //提交异步任务,并保存future到futures
      8. futures.add(cs.submit(()->geocoderByS1()));
      9. futures.add(cs.submit(()->geocoderByS2()));
      10. futures.add(cs.submit(()->geocoderByS3()));
      11. // 获取最快返回的任务执行结果
      12. Integer r = 0;
      13. try {
      14. // 只要有一个成功返回,则break
      15. for (int i = 0; i < 3; ++i) {
      16. r = cs.take().get();
      17. //简单地通过判空来检查是否成功返回
      18. if (r != null) {
      19. break;
      20. }
      21. }
      22. } finally {
      23. //取消所有任务
      24. for(Future<Integer> f : futures)
      25. f.cancel(true);
      26. }
      27. // 返回结果
      28. return r;
  7. 总结

    • 当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
    • CompletionService的实现类ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个ExecutorCompletionService的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
  8. 思考题:

    • 问题:
      本章使用CompletionService实现了一个询价应用的核心功能,后来又有了新的需求,需要计算出最低报价并返回,下面的示例代码尝试实现这个需求,你看看是否存在问题呢?
  1. // 创建线程池
  2. ExecutorService executor =
  3. Executors.newFixedThreadPool(3);
  4. // 创建CompletionService
  5. CompletionService<Integer> cs = new
  6. ExecutorCompletionService<>(executor);
  7. // 异步向电商S1询价
  8. cs.submit(()->getPriceByS1());
  9. // 异步向电商S2询价
  10. cs.submit(()->getPriceByS2());
  11. // 异步向电商S3询价
  12. cs.submit(()->getPriceByS3());
  13. // 将询价结果异步保存到数据库
  14. // 并计算最低报价
  15. AtomicReference<Integer> m =
  16. new AtomicReference<>(Integer.MAX_VALUE);
  17. for (int i=0; i<3; i++) {
  18. executor.execute(()->{
  19. Integer r = null;
  20. try {
  21. r = cs.take().get();
  22. } catch (Exception e) {}
  23. save(r);
  24. m.set(Integer.min(m.get(), r));
  25. });
  26. }
  27. return m;

11-ForkJoin:单机版的MapReduce

  1. 并发编程的任务角度和协作细节角度?

    • 线程池、Future、CompletableFuture和CompletionService,这些工具类都是在帮助我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。对于简单的并行任务,你可以通过“线程池+Future”的方案来解决;如果任务之间有聚合关系,无论是AND聚合还是OR聚合,都可以通过CompletableFuture来解决;而批量的并行任务,则可以通过CompletionService来解决。
    • 并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future、CompletableFuture和CompletionService都列到了分工里面。
    • 下面我用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务,辅以这些流程图,相信你一定能将你的思维模式转换到现实世界里来。
    • 下面的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。
      在这里插入图片描述
  2. 分治任务?

    • 分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;
      -- 具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解
      -- 理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。
    • 应用:
      分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架MapReduce背后的思想也是分治。既然分治这种任务模型如此普遍,那Java显然也需要支持,JUC提供了一种叫做Fork/Join的并行计算框架,就是用来支持分治这种任务模型的
  3. 分治任务模型?

    • 分治任务模型可分为两个阶段:
      ① 一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;
      ② 另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。
    • 在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。
      在这里插入图片描述
  4. Fork/Join的介绍?

    • 是什么?
      Fork/Join是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork对应的是分治任务模型里的任务分解,Join对应的是结果合并。
    • Fork/Join计算框架主要包含两部分:
      ① 一部分是分治任务的线程池ForkJoinPool
      ② 另一部分是分治任务ForkJoinTask。
      这两部分的关系类似于ThreadPoolExecutor和Runnable的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型ForkJoinTask。
    • ForkJoinTask是一个抽象类,它的方法有很多,最核心的是fork()方法和join()方法,其中fork()方法会异步地执行一个子任务,而join()方法则会阻塞当前线程来等待子任务的执行结果。
    • ForkJoinTask有两个子类——RecursiveAction和RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。
      -- 这两个子类都定义了抽象方法compute(),不过区别是RecursiveAction定义的compute()没有返回值,而RecursiveTask定义的compute()方法是有返回值的。
      -- 这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
  5. Fork/Join的使用?

    • 用Fork/Join这个并行计算框架计算斐波那契数列(下面的代码源自Java官方示例)。* 首先需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke()方法来启动分治任务。
    • 由于计算斐波那契数列需要有返回值,所以Fibonacci继承自RecursiveTask。
    • 分治任务Fibonacci 需要实现compute()方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。
      1. static void main(String[] args){
      2. //创建分治任务线程池
      3. ForkJoinPool fjp = newForkJoinPool(4);
      4. //创建分治任务
      5. Fibonacci fib = new Fibonacci(30);
      6. //启动分治任务
      7. Integer result = fjp.invoke(fib);
      8. //输出结果
      9. System.out.println(result);
      10. }
      11. //递归任务
      12. static class Fibonacci extends RecursiveTask<Integer>{
      13. final int n;
      14. Fibonacci(int n){this.n = n;}
      15. protected Integer compute(){
      16. if(n <= 1)
      17. return n;
      18. Fibonacci f1 = new Fibonacci(n - 1);
      19. //创建子任务
      20. f1.fork();
      21. Fibonacci f2 =new Fibonacci(n - 2);
      22. //等待子任务结果,并合并结果
      23. return f2.compute() + f1.join();
      24. }
      25. }
  6. ForkJoinPool工作原理?

    • Fork/Join并行计算的核心组件是ForkJoinPool,所以下面我们就来简单介绍一下ForkJoinPool的工作原理。
      通过专栏前面文章的学习,你应该已经知道ThreadPoolExecutor本质上是一个生产者-消费者模式的实现,
      内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor可以有多个工作
      线程,但是这些工作线程都共享一个任务队列。
      ForkJoinPool本质上也是一个生产者-消费者的实现,但是更加智能,你可以参考下面的ForkJoinPool工作原
      理图来理解其原理。ThreadPoolExecutor内部只有一个任务队列,而ForkJoinPool内部有多个任务队列,当
      我们通过ForkJoinPool的invoke()或者submit()方法提交任务时,ForkJoinPool根据一定的路由规则把任务提
      交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队
      列中。
      如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool支持一种叫做“任务窃
      取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程T2
      对应的任务队列已经空了,它可以“窃取”线程T1对应的任务队列的任务。如此一来,所有的工作线程都
      不会闲下来了。
      ForkJoinPool中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不
      同的端消费,这样能避免很多不必要的数据竞争。我们这里介绍的仅仅是简化后的原理,ForkJoinPool的实现远比我们这里介绍的复杂,如果你感兴趣,建议去看它的源码。 在这里插入图片描述
  7. 模拟MapReduce统计单词数量?

    • 学习MapReduce有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用Fork/Join并
      行计算框架来实现。
      我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据
      里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。
      思路有了,我们马上来实现。下面的示例程序用一个字符串数组 String[] fc 来模拟文件内容,fc里面的
      元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部
      分数据fork一个递归任务去处理(关键代码mr1.fork()),后半部分数据则在当前任务中递归处理
      (mr2.compute())。
      1. static void main(String[] args){
      2. String[] fc = {"hello world",
      3. "hello me",
      4. "hello fork",
      5. "hello join",
      6. "fork join in world"};
      7. //创建ForkJoin线程池
      8. ForkJoinPool fjp =
      9. new ForkJoinPool(3);
      10. //创建任务
      11. MR mr = new MR(
      12. fc, 0, fc.length);
      13. //启动任务
      14. Map<String, Long> result =
      15. fjp.invoke(mr);
      16. //输出结果
      17. result.forEach((k, v)->
      18. System.out.println(k+":"+v));
      19. }
      20. //MR模拟类
      21. static class MR extends
      22. RecursiveTask<Map<String, Long>> {
      23. private String[] fc;
      24. private int start, end;
      25. //构造函数
      26. MR(String[] fc, int fr, int to){
      27. this.fc = fc;
      28. this.start = fr;
      29. this.end = to;
      30. }
      31. @Override protected
      32. Map<String, Long> compute(){
      33. if (end - start == 1) {
      34. return calc(fc[start]);
      35. } else {
      36. int mid = (start+end)/2;
      37. MR mr1 = new MR(
      38. fc, start, mid);
      39. mr1.fork();
      40. MR mr2 = new MR(
      41. fc, mid, end);
      42. //计算⼦任务,并返回合并的结果
      43. return merge(mr2.compute(),
      44. mr1.join());
      45. }
      46. }
      47. //合并结果
      48. private Map<String, Long> merge(
      49. Map<String, Long> r1,
      50. Map<String, Long> r2) {
      51. Map<String, Long> result =
      52. new HashMap<>();
      53. result.putAll(r1);
      54. //合并结果
      55. r2.forEach((k, v) -> {
      56. Long c = result.get(k);
      57. if (c != null)
      58. result.put(k, c+v);
      59. else
      60. result.put(k, v);
      61. });
      62. return result;
      63. }
      64. //统计单词数量
      65. private Map<String, Long>
      66. calc(String line) {
      67. Map<String, Long> result =
      68. new HashMap<>();
      69. //分割单词
      70. String [] words =
      71. line.split("\\s+");
      72. //统计单词数量
      73. for (String w : words) {
      74. Long v = result.get(w);
      75. if (v != null)
      76. result.put(w, v+1);
      77. else
      78. result.put(w, 1L);
      79. }
      80. return result;
      81. }
  8. 总结
    Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小
    的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的
    MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。
    Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工
    作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream
    API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享
    一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密
    集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计
    算而拖慢整个系统的性能。所以建议用不同的ForkJoinPool执行不同类型的计算任务。
    如果你对ForkJoinPool详细的实现细节感兴趣,也可以参考Doug Lea的论文。http://gee.cs.oswego.edu/dl/papers/fj.pdf

  9. 思考题:
    对于一个CPU密集型计算程序,在单核CPU上,使用Fork/Join并行计算框架是否能够提高性能呢?

3. 并发设计模式

生产者-消费者

  1. 异步刷盘方式
    一个自实现的日志组件,写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,往往采用异步刷盘的方式。

    • 刷盘的时机是:
      1)ERROR级别的日志需要立即刷盘;
      2)数据积累到500条需要立即刷盘;
      3)存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
      这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面我们具体看看用生产者-消费者模式如何实现。在下面的示例代码中,可以通过调用info()和error()方法写入日志,这两个方法都是创建了一个日志任务LogMsg,并添加到阻塞队列中,调用info()和error()方法的线程是生产者;而真正将日志写入文件的是消费者线程,在Logger这个类中,我们只创建了1个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。
      1. class Logger {
      2. //任务队列
      3. final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
      4. //flush批量
      5. static final int batchSize = 500;
      6. //只需要⼀个线程写⽇志
      7. ExecutorService es = Executors.newFixedThreadPool(1);
      8. //启动写⽇志线程
      9. void start() {
      10. File file = File.createTempFile("foo", ".log");
      11. final FileWriter writer = new FileWriter(file);
      12. this.es.execute(() -> {
      13. try {
      14. //未刷盘⽇志数量
      15. int curIdx = 0;
      16. long preFT = System.currentTimeMillis();
      17. while (true) {
      18. LogMsg log = bq.poll(5, TimeUnit.SECONDS);
      19. //写⽇志
      20. if (log != null) {
      21. writer.write(log.toString());
      22. ++curIdx;
      23. }
      24. //如果不存在未刷盘数据,则⽆需刷盘
      25. if (curIdx <= 0) {
      26. continue;
      27. }
      28. //根据规则刷盘
      29. if (log != null && log.level == LEVEL.ERROR || curIdx == batchSize ||
      30. System.currentTimeMillis() - preFT > 5000) {
      31. writer.flush();
      32. curIdx = 0;
      33. preFT = System.currentTimeMillis();
      34. }
      35. }
      36. } catch (Exception e) {
      37. e.printStackTrace();
      38. } finally {
      39. try {
      40. writer.flush();
      41. writer.close();
      42. } catch (IOException e) {
      43. e.printStackTrace();
      44. }
      45. }
      46. });
      47. }
      48. //写INFO级别⽇志
      49. void info(String msg) {
      50. bq.put(new LogMsg(LEVEL.INFO, msg));
      51. }
      52. //写ERROR级别⽇志
      53. void error(String msg) {
      54. bq.put(new LogMsg(LEVEL.ERROR, msg));
      55. }
      56. }
      57. //⽇志级别
      58. enum LEVEL {
      59. INFO, ERROR
      60. }
      61. class LogMsg {
      62. LEVEL level;
      63. String msg;
      64. //省略构造函数实现
      65. LogMsg(LEVEL lvl, String msg) {
      66. }
      67. //省略toString()实现
      68. String toString() {
      69. }
      70. }

4. 案例分析

5. 其他并发模型

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注