@songhanshi
2021-01-14T11:26:28.000000Z
字数 97166
阅读 781
Java学习
笔记内容来源:
1. jk:《Java并发编程实战》-王宝令(极客)
2. xn:《Java性能调优实战》-多线程性能调优-刘超(极客)
https://blog.csdn.net/PROGRAM_anywhere/article/details/83552126
为什么要用到并发
并发编程有哪些缺点
减少上下文切换
通常减少上下文切换可以采用无锁并发编程,CAS算法,使用最少的线程和使用协程。
--------------------分割线------------
时间片?
上下文切换是什么?
多核下的上下文切换?
多线程上下文切换的原因?
线程运行时,各状态之间转换的原因?
自发性上下文切换?
非自发性上下文切换?
上下文切换性能问题?
串行和并行代码的比较:
如何监测到上下文切换?
上下文切换到底开销在哪些环节?
在并发量比较大的情况下,什么时候用单线程,什么时候用多线程呢?
在多线程中使用Synchronized还会发生进程间的上下文切换吗?具体又会发生在哪些环节呢?
| jk |
|---|

参考:
本节说的可能并不好。该篇我看了三遍也没能完全看懂,于是自己搜索java管程相关的技术文章,才大致对管程有了个认知,总结如下:
1.管程是一种概念,任何语言都可以通用。
2.在java中,每个加锁的对象都绑定着一个管程(监视器)
3.线程访问加锁对象,就是去拥有一个监视器的过程。如一个病人去门诊室看医生,医生是共享资源,门锁锁定医生,病人去看医生,就是访问医生这个共享资源,门诊室其实是监视器(管程)。
4.所有线程访问共享资源,都需要先拥有监视器。就像所有病人看病都需要先拥有进入门诊室的资格。
5.监视器至少有两个等待队列。一个是进入监视器的等待队列一个是条件变量对应的等待队列。后者可以有多个。就像一个病人进入门诊室诊断后,需要去验血,那么它需要去抽血室排队等待。另外一个病人心脏不舒服,需要去拍胸片,去拍摄室等待。
6.监视器要求的条件满足后,位于条件变量下等待的线程需要重新在门诊室门外排队,等待进入监视器。就像抽血的那位,抽完后,拿到了化验单,然后,重新回到门诊室等待,然后进入看病,然后退出,医生通知下一位进入。
总结起来就是,管程就是一个对象监视器。任何线程想要访问该资源,就要排队进入监控范围。进入之后,接受检查,不符合条件,则要继续等待,直到被通知,然后继续进入监视器。
管程的发展?
为什么Java在1.5之前仅仅提供了synchronized关键字及wait()、notify()、notifyAll()这三个看似从天而降的方法?
在刚接触Java的时候,我以为它会提供信号量这种编程原语,因为操作系统原理课程告诉我,用信号量能解决所有并发问题,结果我发现不是。
什么是管程?
管程如何管理?
MESA模型介绍?


代码再次说明MESA模型?
代码实现一个阻塞队列,阻塞队列有两个操作分别是入队和出队,这两个方法都是先获取互斥锁,类比管程模型中的入口。
1) 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();
2) 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();
3) 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列;
4) 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列。
public class BlockedQueue<T> {final Lock lock = new ReentrantLock();// 条件变量:队列不满final Condition notFull = lock.newCondition();// 条件变量:队列不空final Condition notEmpty = lock.newCondition();// 入队void enq(T x) {lock.lock();try {while (队列已满)// 等待队列不满notFull.await();// 省略入队操作...// 入队后, 通知可出队notEmpty.signal();} finally {lock.unlock();}}// 出队void deq() {lock.lock();try {while (队列已空)// 等待队列不空notEmpty.await();// 省略出队操作...// 出队后,通知可入队notFull.signal();} finally {lock.unlock();}}}
示例中,用了Java并发包里面的Lock和Condition。
注意:await()和前面我们提到的wait()语义是一样的;signal()和前面我们提到的notify()语义是一样的。
wait() 的正确姿势?
对于MESA管程来说,有一个编程范式,就是需要在一个while循环里面调用wait()。这个是MESA管程特有的。
while(条件不满足) {wait();}
Hasen模型、Hoare模型和MESA模型的一个核心区别就是当条件满足后,如何通知相关线程。
notify()何时可以使用?
while (队列已满)// 等待队列不满notFull.await()
所有等待线程被唤醒后执行的操作也是相同的,都是下面这几行:
// 省略入队操作...// 入队后, 通知可出队notEmpty.signal();
同时也满足第 3 条,只需要唤醒一个线程。所以上面阻塞队列的代码,使用 signal() 是可以的。
Java内置的管程?

主要内容?
“并发编程中共享变量的一致性”。
CPU缓存导致的数据不一致、重排序
结合Happens-before规则,可以将一致性分为以下几个级别:


Happens-before 规则?
多线程操作共享变量可能出现的问题?
假设有两个线程(线程 1 和线程 2)分别执行下面的方法,x 是共享变量:
// 代码 1public class Example {int x = 0;public void count() {x++; //1System.out.println(x)//2}}
两个线程同时运行,线程的变量的值可能的结果:
| 线程1调用count | 线程2调用count |
|---|---|
| x++; | x++; |
结果:
| 结果1 | 结果2 | 结果3 |
|---|---|---|
| 1,1 | 2,1 | 1,2 |
CPU缓存-理解"1,1"的情况?

CPU缓存-理解"1,1"的情况?
1,1 的运行结果:

重排序问题?
// 代码 1public class Example {int x = 0;boolean flag = false;public void writer() {x = 1; //1flag = true; //2}public void reader() {if (flag) { //3int r1 = x; //4System.out.println(r1==x)}}}



重排序-问题解析?
int x = 1;//步骤1:加载x变量的内存地址到寄存器中,加载1到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中boolean flag = true; //步骤2 加载flag变量的内存地址到寄存器中,加载true到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中int y = x + 1;//步骤3 重新加载x变量的内存地址到寄存器中,加载1到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中
临界区?
管程?
synchronized (this){ // 此处自动加锁// x 是共享变量, 初始值 =10if (this.x < 12) {this.x = 12;}} // 此处自动解锁
什么是线程安全呢?
如何写出线程安全的程序呢?
所有的代码都需要分析否存在这三个问题吗?
什么叫数据竞争?
public class Test {private long count = 0;void add10K() {int idx = 0;while(idx++ < 10000) {count += 1;}}}
竞态条件(Race Condition)?
if (状态变量 满足 执行条件) {执行操作}
-- 当某个线程发现状态变量满足执行条件后,开始执行操作;可是就在这个线程执行操作的时候,其他线程同时修改了状态变量导致状态变量不满足执行条件了。当然很多场景下,这个条件不是显式的。
例子2-1:例子1加锁
-- 在访问数据的地方加个锁保护,如下:
-- 所有访问共享变量value的地方,都增加了互斥锁,此时是不存在数据
竞争的。但add10K()并不是线程安全的。
public class Test {private long count = 0;// synchronizedsynchronized long get(){return count;}// synchronizedsynchronized void set(long v){count = v;}void add10K() {int idx = 0;while(idx++ < 10000) {set(get()+1)}}}
问题:竞态条件
-- 假设count=0,A、B两个线程同时执行get()方法时,get()方法会返回相同的值0,A、B执行get()+1操作,结果都是1,之后A、B再将结果1写入了内存。期望是 2,而结果却是1。
-- A、B完全同时执行,结果是 1;A、B前后执行,结果是2。
class Account {private int balance;// 转账void transfer(Account target, int amt){if (this.balance > amt) { // 6this.balance -= amt; // A-150target.balance += amt;//B+150}}}
数据竞争、竞态条件如何保证线程安全?
活跃性问题概念?
死锁、活锁、饥饿?
性能问题?
串行对性能的影响是怎么样的呢?
假设串行百分比是 5%,用多核多线程相比单核单线程能提速多少呢?
怎么避免锁带来的性能问题呢?
方案层面:
性能方面的三个重要度量指标?
可见性概念
一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性。
缓存导致的可见性问题?
验证多核场景下的可见性问题
public class Test {private static long count =0;private void add10K(){int idx=0;while (idx++<100000){count+=1;}}private static long calc(){final Test test =new Test();//创建两个线程,执行 add() 操作Thread thread1=new Thread(()->{test.add10K();});Thread thread2=new Thread(()->{test.add10K();});//启动两个线程thread1.start();thread2.start();//等待两个线程执行结束try {thread1.join();thread2.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(count);return count;}public static void main(String[] args) {calc();}}
时间片
原子性概念
线程切换

任务切换->线程切换发展
count+=1
1)CPU指令
2)执行顺序
指令 1:把变量 count 从内存加载到 CPU 的寄存器;
指令 2:在寄存器中执行 +1 操作;
指令 3:将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。

有序性
例子
双重检查单例模式(DCL)
public class SingletonLazy {/**懒汉模式-双重检验锁-线程安全且并行效率高*/private static SingletonLazy instance;public static SingletonLazy getInstance() {// 先判断实例是否存在,若不存在再对类对象进行加锁处理if (instance == null) { //Single Checkedsynchronized (SingletonLazy.class) {if (instance == null) { //Double Checkedinstance = new SingletonLazy();}}}return instance;}}
DCL 问题:出在 new 操作上
DCL 问题:导致问题-空指针

概述
例子
class VolatileDemo {int x = 0;volatile boolean v = false;public void writer() {x = 42;v = true;}public void reader() {if (v == true) {// 这里 x 会是多少呢?}}}
Java5前的问题
概念
Happens-Before 六规则
程序前面对某个变量的修改一定是对后续操作可见的。
// “x = 42;” Happens-Before “v = true;”x = 42;v = true;
volatile 变量规则

管程中的锁在 Java 里是隐式实现的
synchronized (this) { // 此处自动加锁// x 是共享变量, 初始值 =10if (this.x < 12) {this.x = 12;}} // 此处自动解锁
结合规则 4——管程中锁的规则理解:假设 x 的初始值是 10,线程 A 执行完代码块后 x 的值会变成12(执行完自动释放锁),线程 B 进入代码块时,能够看到线程 A 对 x 的写操作,也就是线程 B 能够看到x==12。
指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
Thread B = new Thread(() -> {// 主线程调用 B.start() 之前// 所有对共享变量的修改,此处皆可见// 此例中,var==77});// 此处对共享变量 var 修改var =77;// 主线程启动子线程B.start();
线程的join()规则
换句话说就是,如果在线程 A 中,调用线程 B 的 join() 并成功返回,那么线程 B 中的任意操作 Happens-Before 于该 join() 操作的返回。
Thread B = new Thread(()->{// 此处对共享变量 var 修改var = 66;});// 例如此处对共享变量修改,// 则这个修改结果对线程 B 可见// 主线程启动子线程B.start();B.join();// 子线程所有对共享变量的修改// 在主线程调用 B.join() 之后皆可见// 此例中,var==66
线程中断规则:
概念
逸出
final int x;// 错误的构造函数public FinalFieldExample() {x = 3;y = 4;// 此处就是讲 this 逸出,global.obj = this;}
原子性概念?
32位CPU上执行long型变量的写操作?

synchronized的使用?
class X {//1. 修饰非静态方法synchronized void foo() {// 临界区}//2. 修饰静态方法synchronized static void bar() {// 临界区}//3. 修饰代码块Object obj = new Object();void baz() {synchronized (obj) {// 临界区}}}
修饰方法的时候锁定的是什么呢?
Class X {//1. 修饰非静态方法synchronized(this) void foo() {// 临界区}//2. 修饰静态方法synchronized(X.class) static void bar() {// 临界区}}
用 synchronized 解决 count+=1 问题?
代码
class SafeCalc {long value = 0L;long get() {return value;}synchronized void addOne() {value += 1;}}
原子性:
addOne()方法,被synchronized修饰后,无论是单核CPU还是多核CPU,只有一个线程能够执行addOne()方法,所以一定能保证原子操作。
可见性-get():
-- 管程中锁的规则,是只保证后续对这个锁的加锁的可见性,而get()方法并没有加锁操作,可见性没法保证。
-- 加上syc关键字
synchronized long get() {return value;}
get()方法和addOne()方法都需要访问value这个受保护的资源,这个资源用this这把锁来保护。线程要进入临界区get()和addOne(),必须先获得this这把锁,这样get()和addOne()也是互斥的。
锁和受保护资源的关系?
加上static
class SafeCalc {static long value = 0L; // +staticlong get() {return value;}synchronized static void addOne() { // +staticvalue += 1;}}
两个锁保护一个资源。这个受保护的资源就是静态变量 value,两个锁分别是 this 和 SafeCalc.class。我们可以用下面这幅图来形象描述这个关系。由于临界区 get() 和 addOne() 是用两个锁保护的,因此这两个临界区没有互斥关系,临界区 addOne() 对 value 的修改对临界区 get() 也没有可见性保证,这就导致并发问题。
两种加锁对比?

问题类型?
class Account {// 锁:保护账户余额private final Object balLock = new Object();// 账户余额private Integer balance;// 锁:保护账户密码private final Object pwLock = new Object();// 账户密码private String password;// 取款void withdraw(Integer amt) {synchronized (balLock) {if (this.balance > amt) {this.balance -= amt;}}}// 查看余额Integer getBalance() {synchronized (balLock) {return balance;}}// 更改密码void updatePassword(String pw) {synchronized (pwLock) {this.password = pw;}}// 查看密码String getPassword() {synchronized (pwLock) {return password;}}}
一把互斥锁来保护多个资源的问题?
如何保护有关联关系的多个资源?
class Account {private int balance;// 转账void transfer(Account target, int amt) {synchronized (Account.class) {if (this.balance > amt) {this.balance -= amt;target.balance += amt;}}}}
死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
死锁的原因?
如何预防死锁?
循环等待问题?
核心代码:
// 一次性申请转出账户和转入账户,直到成功while(!actr.apply(this, target));
如果apply()操作耗时非常短,而且并发冲突量也不大时,循环上几次或者几十次就能一次性获取转出账户和转入账户了。但如果apply()操作耗时长,或者并发冲突量大的时候,可能要循环上万次才能获取到锁太消耗CPU,循环等待方案就不适用了。
循环等待问题的解决方案?
Java语言是如何支持等待-通知机制
类比于现实的等待 - 通知机制的就医流程?
1)患者先去挂号,然后到就诊门口分诊,等待叫号;
2)当叫到自己的号时,患者就可以找大夫就诊了;
3)就诊过程中,大夫可能会让患者去做检查,同时叫下一位患者;
4)当患者做完检查后,拿检测报告重新分诊,等待叫号;
5)当大夫再次叫到自己的号时,患者再去找大夫就诊。
如何用synchronized实现等待-通知机制?
如何用synchronized实现互斥锁?



使用wait()、notify()、notifyAll()方法?
一个更好地资源分配器:如何解决一次性申请转出账户和转入账户的问题?
while(条件不满足) {wait();}```* 利用这种范式可以解决上面提到的条件曾经满足过这个问题。因为当 wait() 返回时,有可能条件已经发生变化了,曾经条件满足,但是现在已经不满足了,所以要重新检验条件是否满足。范式,意味着是经典做法,所以没有特殊理由不要尝试换个写法。``` javaclass Allocator {private List<Object> als;// 一次性申请所有资源synchronized void apply(Object from, Object to){// 经典写法while(als.contains(from) ||als.contains(to)){try{wait();}catch(Exception e){}}als.add(from);als.add(to);}// 归还资源synchronized void free(Object from, Object to){als.remove(from);als.remove(to);notifyAll();}}
尽量使用 notifyAll()?
总结
| 通用 | Java状态 | 状态转换 | interrupt |
|---|
1. 通用的线程生命周期?
通用的线程生命周期基本上可以用下图这个“五态模型”来描述。这五态分别是:初始状态、可运行状态、运行状态、休眠状态和终止状态。

这“五态模型”的详细情况如下所示?
1)初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
2)可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
3)当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态。
4)运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
5)线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
五种状态的定义?
这五种状态在不同编程语言里会有简化合并?
Java中线程的生命周期?

具体是哪些情形会导致线程从RUNNABLE状态转换到这三种状态呢?而这三种状态又是何时转换回RUNNABLE的呢?以及 NEW、TERMINATED 和 RUNNABLE 状态是如何转换的?
RUNNABLE与BLOCKED的状态转换?
RUNNABLE与WAITING的状态转换?
RUNNABLE与TIMED_WAITING的状态转换?
有五种场景会触发这种转换:
1)调用带超时参数的 Thread.sleep(long millis) 方法;
2)获得synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
3)调用带超时参数的 Thread.join(long millis) 方法;
4)调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
5)调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
NEW到RUNNABLE状态?
MyThread myThread = new MyThread();// 从 NEW 状态转换到 RUNNABLE 状态myThread.start();
从RUNNABLE到TERMINATED状态?
stop() 和 interrupt() 方法的主要区别是什么呢?
被interrupt的线程,是怎么收到通知的呢?
总结
思考题
下面代码的本意是当前线程被中断之后,退出while(true),你觉得这段代码是否正确呢?
Thread th = Thread.currentThread();while(true) {if(th.isInterrupted()) {break;}// 省略业务代码无数try {Thread.sleep(100);}catch (InterruptedException e){e.printStackTrace();}}
目的:注意InterruptedException 的处理方式。
try {Thread.sleep(100);}catch(InterruptedException e){// 重新设置中断标志位th.interrupt();}
1. 各种线程池的线程数量调整成多少是合适的?/Tomcat的线程数、Jdbc连接池的连接数是多少?等等。
2. 如何设置合适的线程数呢?
2. 分析多线程的应用场景有哪些?
如何度量性能?
分析为什么要使用多线程?
怎么降低延迟,提高吞吐量呢?
多线程的应用场景?
示例说明:如何利用多线程来提升 CPU 和 I/O 设备的利用率?


在单核时代,多线程主要就是用来平衡CPU和I/O设备的。如果程序只有CPU计算,而没有I/O 操作的话,多线程不但不会提升性能,还会使性能变得更差,原因是增加了线程切换的成本。但是在多核时代,这种纯计算型的程序也可以利用多线程来提升性能。为什么呢?

I/O 密集型计算、CPU 密集型计算?
创建多少线程合适?
讨论题:
有些同学对于最佳线程数的设置积累了一些经验值,认为对于 I/O密集型应用,最佳线程数应该为:2 * CPU 的核数 + 1,你觉得这个经验值合理吗?
11|Java线程(下):为什么局部变量是线程安全的?
Java 语言里,是不是所有变量都是共享变量呢?
Java 方法里面的局部变量是否存在并发问题呢?
下面我们就先结合一个例子剖析下这个问题。
比如,下面代码里的 fibonacci() 这个方法,会根据传入的参数 n ,返回 1 到 n 的斐波那契数
列,斐波那契数列类似这样: 1、1、2、3、5、8、13、21、34……第 1 项和第 2 项是 1,从第
3 项开始,每一项都等于前两项之和。在这个方法里面,有个局部变量:数组 r 用来保存数列的
结果,每次计算完一项,都会更新数组 r 对应位置中的值。你可以思考这样一个问题,当多个线
程调用 fibonacci() 这个方法的时候,数组 r 是否存在数据竞争(Data Race)呢?
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
Java 语言本身提供的 synchronized 也是管程的一种实现,既然 Java 从语言层面已经实现了管程了,那为什么还要在SDK里提供另外一种实现呢?
对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?
// 支持中断的 APIvoid lockInterruptibly()throws InterruptedException;// 支持超时的 APIboolean tryLock(long time, TimeUnit unit)throws InterruptedException;// 支持非阻塞获取锁的 APIboolean tryLock();
如何保证可见性?
class X {private final Lock rtl = new ReentrantLock();int value;public void addOne() {// 获取锁rtl.lock();try {value+=1;} finally {// 保证锁能释放rtl.unlock();}}}
答案必须是肯定的。Java SDK里面锁原理简述:利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state的值(简化后的代码如下面所示)。也就是说,在执行 value+=1
之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile变量 state。根据相关的 Happens-Before 规则:
1)顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
2)volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作Happens-Before 线程 T2 的 lock() 操作;
3)传递性规则:线程 T2 的 lock() 操作 Happens-Before 线程 T1 的 value+=1 。
class SampleLock {volatile int state;// 加锁lock() {// 省略代码无数state = 1;}// 解锁unlock() {// 省略代码无数state = 0;}}
所以说,后续线程 T2 能够看到 value 的正确结果
什么是可重入锁?
class X {private final Lock rtl = new ReentrantLock();int value;public int get() {// 获取锁rtl.lock(); ②try {return value;} finally {// 保证锁能释放rtl.unlock();}}public void addOne() {// 获取锁rtl.lock();try {value = 1 + get(); ①} finally {// 保证锁能释放rtl.unlock();}}}
可重入函数?
公平锁与非公平锁?
// 无参构造函数:默认非公平锁public ReentrantLock() {sync = new NonfairSync();}// 根据公平策略参数创建锁public ReentrantLock(boolean fair){sync = fair ? new FairSync() : new NonfairSync();}
在入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。
-- 如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;
-- 如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
这三条规则,前两条估计你一定会认同,最后一条你可能会觉得过于严苛。但是我还是倾向于你去遵守,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程sleep()的调用,也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。
并发问题,本来就难以诊断,所以你一定要让你的代码尽量安全,尽量简单,哪怕有一点可能会出问题,都要努力避免。
总结
Java SDK 并发包里的 Lock 接口里面的每个方法,你可以感受到,都是经过深思熟虑的。除了支持类似 synchronized 隐式加锁的 lock() 方法外,还支持超时、非阻塞、可中断的方式获取锁,这三种方式为我们编写更加安全、健壮的并发程序提供了很大的便利。希望你以后在使用锁的时候,一定要仔细斟酌。
除了并发大师 Doug Lea推荐的三个最佳实践外,你也可以参考一些诸如:减少锁的持有时间、减小锁的粒度等业界广为人知的规则,其实本质上它们都是相通的,不过是在该加锁的地方加锁而已。
课后思考?
你已经知道 tryLock()支持非阻塞方式获取锁,下面这段关于转账的程序就使用到了 tryLock(),是否存在死锁问题呢?
class Account {private int balance;private final Lock lock = new ReentrantLock();// 转账void transfer(Account tar, int amt){while (true) {if(this.lock.tryLock()) {try {if (tar.lock.tryLock()) {try {this.balance -= amt;tar.balance += amt;} finally {tar.lock.unlock();}}//if} finally {this.lock.unlock();}}//if}//while}//transfer}
修复后的代码如下所示,我仅仅修改了两个地方,一处是转账成功之后break,另一处是在while循环体结束前增加了Thread.sleep(随机时间)。
class Account {private int balance;private final Lock lock = new ReentrantLock();// 转账void transfer(Account tar, int amt) {while (true) {if (this.lock.tryLock()) {try {if (tar.lock.tryLock()) {try {this.balance -= amt;tar.balance += amt;//新增:退出循环break;} finally {tar.lock.unlock();}}//if} finally {this.lock.unlock();}}//if//新增:sleep⼀个随机时间避免活锁Thread.sleep(随机时间);}//while}//transfer}
Java SDK 并发包里的Lock有别于synchronized隐式锁的三个特性:能够响应中断、支持超时和非阻塞地获取锁。
Java SDK 并发包里的 Condition
如何利用两个条件变量快速实现阻塞队列呢?
一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。相
关的代码,重新列出。
public class BlockedQueue<T>{final Lock lock = new ReentrantLock();// 条件变量:队列不满final Condition notFull = lock.newCondition();// 条件变量:队列不空final Condition notEmpty = lock.newCondition();// 入队void enq(T x) {lock.lock();try {while (队列已满){// 等待队列不满notFull.await();}// 省略入队操作...// 入队后, 通知可出队notEmpty.signal();}finally {lock.unlock();}}// 出队void deq(){lock.lock();try {while (队列已空){// 等待队列不空notEmpty.await();}// 省略出队操作...// 出队后,通知可入队notFull.signal();}finally {lock.unlock();}}}
同步和异步?
// 计算圆周率小说点后 100 万位String pai1M() {// 省略代码无数}pai1M()printf("hello world")
异步实现的两种方式?
在项目Dubbo中,Lock和Condition是怎么用的?
RPC框架Dubbo内部做了异步转同步,如何实现?分析一下相关源码。
对于下面一个简单的RPC调用,默认情况下sayHello()方法,是个同步方法,也就是说,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。
DemoService service = 初始化部分省略String message =service.sayHello("dubbo");System.out.println(message);
如果此时你将调用线程dump出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明Dubbo帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在DefaultFuture.get()方法上,所以可以推断:Dubbo 异步转同步的功能应该是通过 DefaultFuture这个类实现的。

不过为了理清前后关系,还是有必要分析一下调用DefaultFuture.get() 之前发生了什么。DubboInvoker 的108行调用了DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了 request(inv, timeout)方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。
public class DubboInvoker{Result doInvoke(Invocation inv){// 下面这行就是源码中 108 行// 为了便于展示,做了修改return currentClient.request(inv, timeout).get();}}
DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待-通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。
// 创建锁与条件变量private final Lock lock= new ReentrantLock();private final Condition done= lock.newCondition();// 调用方通过该方法等待结果Object get(int timeout){long start = System.nanoTime();lock.lock();try {while (!isDone()) {done.await(timeout);long cur=System.nanoTime();if (isDone() ||cur-start > timeout){break;}}} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException();}return returnFromResponse();}// RPC 结果是否已经返回boolean isDone() {return response != null;}// RPC 结果返回时调用该方法private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {done.signal();}} finally {lock.unlock();}}
调用线程通过调用 get() 方法等待 RPC返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用unlock()释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal()来通知调用线程,结果已经返回,不用继续等待了。
课后思考:efaultFuture 里面唤醒等待的线程,用的是 signal(),而不是 signalAll(),你来分析一下,这样做是否合理呢?
private void doReceived(Response res) {lock.lock();try {response = res;done.signalAll();} finally {lock.unlock();}}
Semaphore简介?
信号量模型?

信号量模型的代码化?
class Semaphore{// 计数器int count;// 等待队列Queue queue;// 初始化操作Semaphore(int c){this.count=c;}//void down(){this.count--;if(this.count<0){// 将当前线程插入等待队列// 阻塞当前线程}}void up(){this.count++;if(this.count<=0) {// 移除等待队列中的某个线程 T// 唤醒线程 T}}}
PV原语?
如何使用信号量?
累加器中的互斥?
如下,acquire() 就是信号量里的down()操作,release() 就是信号量里的 up() 操作。
static int count;// 初始化信号量static final Semaphore s = new Semaphore(1);// 用信号量保证互斥static void addOne() {s.acquire();try {count+=1;} finally {s.release();}}
信号量的计数器,设置成了1,这个1表示只允许一个线程进入临界区。
分析信号量是如何保证互斥的?
既然Java SDK提供了Lock,为啥还要提供一个Semaphore?
限流器的应用?
快速实现一个限流器?
对象池的示例代码:
class ObjPool<T, R> {final List<T> pool;// 用信号量实现限流器final Semaphore sem;// 构造函数ObjPool(int size, T t){pool = new Vector<T>(){};for(int i=0; i<size; i++){pool.add(t);}sem = new Semaphore(size);}// 利用对象池的对象,调用 funcR exec(Function<T,R> func) {T t = null;sem.acquire();try {t = pool.remove(0);return func.apply(t);} finally {pool.add(t);sem.release();}}}// 创建对象池ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);// 通过对象池获取 t,之后执行pool.exec(t -> {System.out.println(t);return t.toString();});
用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的
exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释
放工作是通过 pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
思考题:在上面对象池的例子中,对象保存在了Vector中,Vector是Java提供的线程安全的容器,如果我们把Vector换成ArrayList,是否可以呢?
总结
Java SDK并发包里为什么还有很多其他的工具类呢?
并发场景:读多写少场景 的分析?
那什么是读写锁呢?
读写锁与互斥锁的重要区别?
如何快速实现一个缓存?
用 ReadWriteLock快速实现一个通用的缓存工具类:
class Cache<K,V> {final Map<K, V> m = new HashMap<>();final ReadWriteLock rwl = new ReentrantReadWriteLock();// 读锁final Lock r = rwl.readLock();// 写锁final Lock w = rwl.writeLock();// 读缓存V get(K key) {r.lock();try { return m.get(key); }finally { r.unlock(); }}// 写缓存V put(String key, Data v) {w.lock();try { return m.put(key, v); }finally { w.unlock(); }}}
处理缓存数据的初始化问题?


实现缓存的按需加载?
//按需加载的功能class Cache<K,V> {final Map<K, V> m = new HashMap<>();final ReadWriteLock rwl = new ReentrantReadWriteLock();final Lock r = rwl.readLock();final Lock w = rwl.writeLock();V get(K key) {V v = null;// 读缓存r.lock(); //①try {v = m.get(key); //②} finally{r.unlock(); //③}// 缓存中存在,返回if(v != null) { //④return v;}// 缓存中不存在,查询数据库w.lock(); //⑤try {// 再次验证// 其他线程可能已经查询过数据库v = m.get(key); //⑥if(v == null){ //⑦// 查询数据库v= 省略代码⽆数m.put(key, v);}} finally{w.unlock();}return v;}}
为什么我们要再次验证呢?
上述缓存方案的问题?
读写锁的升级与降级?
上面按需加载的示例代码中,在①处获取读锁,在③处释放读锁,那是否可以在②处的下面增加验证缓存并更新缓存的逻辑呢?
// 读缓存r.lock(); ①try {v = m.get(key); ②if (v == null) {w.lock();try {// 再次验证并更新缓存// 省略详细代码} finally{w.unlock();}}} finally{r.unlock(); ③}
锁降级?
class CachedData {Object data;volatile boolean cacheValid;final ReadWriteLock rwl = new ReentrantReadWriteLock();// 读锁final Lock r = rwl.readLock();// 写锁final Lock w = rwl.writeLock();void processCachedData() {// 获取读锁r.lock();if (!cacheValid) {// 释放读锁,因为不允许读锁的升级r.unlock();// 获取写锁w.lock();try {// 再次检查状态if (!cacheValid) {data = ...cacheValid = true;}// 释放写锁前,降级为读锁// 降级是可以的r.lock(); // ①} finally {// 释放写锁w.unlock();}}// 此处仍然持有读锁try {use(data);}finally {r.unlock();}}}
课后思考
有同学反映线上系统停止响应了,CPU 利用率很低,你怀疑有同学一不小心写出了读锁升级写锁的方案,那你该如何验证自己的怀疑呢?
总结
StampedLock 和ReadWriteLock 有哪些区别?
final StampedLock sl =new StampedLock();// 获取 / 释放悲观读锁⽰意代码long stamp = sl.readLock();try {// 省略业务相关代码} finally {sl.unlockRead(stamp);}// 获取 / 释放写锁⽰意代码long stamp = sl.writeLock();try {// 省略业务相关代码} finally {sl.unlockWrite(stamp);}
StampedLock 的性能优于ReadWriteLock的原因?
乐观读悲观读的使用示例?
distanceFromOrigin()方法中,首先通过调用tryOptimisticRead()获取了一个 stamp,这里的tryOptimisticRead()就是乐观读。之后将共享变量x和y读入方法的局部变量中,不过需要注意的是,由于tryOptimisticRead()是无锁的,所以共享变量x和y读入方法局部变量时,x和y有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,验证操作通过调用validate(stamp)来实现的。
class Point {private int x, y;final StampedLock sl = new StampedLock();// 计算到原点的距离int distanceFromOrigin() {// 乐观读long stamp = sl.tryOptimisticRead();// 读⼊局部变量,读的过程数据可能被修改int curX = x, curY = y;// 判断执⾏读操作期间,是否存在写操作,如果存在,则sl.validate返回falseif (!sl.validate(stamp)){// 升级为悲观读锁stamp = sl.readLock();try {curX = x;curY = y;} finally {// 释放悲观读锁sl.unlockRead(stamp);}}return Math.sqrt(curX * curX + curY * curY);}}
代码中,如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。
进一步理解乐观读--介绍一下数据库里的乐观锁?
select id,... ,versionfrom product_docwhere id=777
-- 用户在生产订单UI执行保存操作的时候,后台利用下面的SQL语句更新生产订单,此处我们假设该条生产订单的version=9。
update product_docset version=version+1,...where id=777 and version=9
-- 如果这条 SQL 语句执行成功并且返回的条数等于1,那么说明从生产订单 UI 执行查询操作到执行保存操作期间,没有其他人修改过这条数据。因为如果这期间其他人修改过这条数据,那么版本号字段一定会大于 9。
进一步理解乐观读--StampedLock的乐观读?
StampedLock 使用注意事项?
final StampedLock lock = new StampedLock();Thread T1 = new Thread(() -> {// 获取写锁lock.writeLock();// 永远阻塞在此处,不释放写锁LockSupport.park();});T1.start();// 保证 T1 获取写锁Thread.sleep(100);Thread T2 = new Thread(() ->// 阻塞在悲观读锁lock.readLock());T2.start();// 保证 T2 阻塞在读锁Thread.sleep(100);// 中断线程 T2// 会导致线程 T2 所在 CPU 飙升T2.interrupt();T2.join();
工程使用?
StampedLock读模板:
final StampedLock sl = new StampedLock();// 乐观读long stamp = sl.tryOptimisticRead();// 读⼊⽅法局部变量......// 校验 stampif (!sl.validate(stamp)){// 升级为悲观读锁stamp = sl.readLock();try {// 读⼊⽅法局部变量.....} finally {// 释放悲观读锁sl.unlockRead(stamp);}}// 使⽤⽅法局部变量执⾏业务操作......
StampedLock 写模板:
long stamp = sl.writeLock();try {// 写共享变量......} finally {sl.unlockWrite(stamp);}
锁升级降级?
StampedLock 支持锁的降级(通过 tryConvertToReadLock() 方法实现)和升级(通过tryConvertToWriteLock()方法实现),但是建议你要慎重使用。下面的代码也源自Java的官方示例,仅仅做了一点修改,但隐藏了一个 Bug。
private double x, y;final StampedLock sl = new StampedLock();// 存在问题的⽅法void moveIfAtOrigin(double newX, double newY){long stamp = sl.readLock();try {while(x == 0.0 && y == 0.0){long ws = sl.tryConvertToWriteLock(stamp);if (ws != 0L) {x = newX;y = newY;break;} else {sl.unlockRead(stamp);stamp = sl.writeLock();}}} finally {sl.unlock(stamp);}}
Bug出在没有正确地释放锁。
private double x, y;final StampedLock sl = new StampedLock();// 存在问题的⽅法void moveIfAtOrigin(double newX, double newY){long stamp = sl.readLock();try {while(x == 0.0 && y == 0.0){long ws = sl.tryConvertToWriteLock(stamp);if (ws != 0L) {stamp = ws; //① 问题在于没对stamp重新赋值,增加赋值x = newX;y = newY;break;} else {sl.unlockRead(stamp);stamp = sl.writeLock();}}} finally {sl.unlock(stamp);s1.unlock(stamp);//②此处unlock的是stamp}}
对账系统问题?

while(存在未对账订单){// 查询未对账订单pos = getPOrders();// 查询派送单dos = getDOrders();// 执⾏对账操作diff = check(pos, dos);// 差异写⼊差异库save(diff);}
优化一:利用并行优化对账系统?


while(存在未对账订单){// 查询未对账订单Thread T1 = new Thread(()->{pos = getPOrders();});T1.start();// 查询派送单Thread T2 = new Thread(()->{dos = getDOrders();});T2.start();// 等待 T1、T2 结束T1.join();T2.join();// 执⾏对账操作diff = check(pos, dos);// 差异写⼊差异库save(diff);}
优化二:用 CountDownLatch实现线程等待?
// 创建 2 个线程的线程池Executor executor = Executors.newFixedThreadPool(2);while(存在未对账订单){// 查询未对账订单executor.execute(()-> {pos = getPOrders();});// 查询派送单executor.execute(()-> {dos = getDOrders();});/* ??如何实现等待??*/// 执⾏对账操作diff = check(pos, dos);// 差异写⼊差异库save(diff);}
优化二:解决通知的问题?
// 创建 2 个线程的线程池Executor executor = Executors.newFixedThreadPool(2);while(存在未对账订单){// 计数器初始化为 2CountDownLatch latch = new CountDownLatch(2);// 查询未对账订单executor.execute(()-> {pos = getPOrders();latch.countDown();});// 查询派送单executor.execute(()-> {dos = getDOrders();latch.countDown();});// 等待两个查询操作结束latch.await();// 执⾏对账操作diff = check(pos, dos);// 差异写⼊差异库save(diff);}
优化三:生产者-消费者+队列?


优化三:如何用双队列来实现完全的并行?

// 订单队列Vector<P> pos;// 派送单队列Vector<D> dos;// 执⾏回调的线程池Executor executor = Executors.newFixedThreadPool(1);final CyclicBarrier barrier = new CyclicBarrier(2, ()->{executor.execute(()->check());});void check(){P p = pos.remove(0);D d = dos.remove(0);// 执⾏对账操作diff = check(p, d);// 差异写⼊差异库save(diff);}void checkAll(){// 循环查询订单库Thread T1 = new Thread(()->{while(存在未对账订单){// 查询订单库pos.add(getPOrders());// 等待barrier.await();}}T1.start();// 循环查询运单库Thread T2 = new Thread(()->{while(存在未对账订单){// 查询运单库dos.add(getDOrders());// 等待barrier.await();}}T2.start();}
同步容器及其注意事项
如何将非线程安全的容器变成线程安全的容器?
ArrayList变成线程安全的?
SafeArrayList<T>{// 封装ArrayListList<T> c=new ArrayList<>();// 控制访问路径synchronized T get(int idx){return c.get(idx);}synchronized void add(int idx, T t) {c.add(idx, t);}synchronized boolean addIfNotExist(T t){if(!c.contains(t)) {c.add(t);return true;}return false;}}
List list = Collections.synchronizedList(new ArrayList());Set set = Collections.synchronizedSet(new HashSet());Map map = Collections.synchronizedMap(new HashMap());
List list = Collections.synchronizedList(new ArrayList());Iterator i = list.iterator();while (i.hasNext())foo(i.next());
Listlist = Collections.synchronizedList(new ArrayList());synchronized(list){Iterator i = list.iterator();while(i.hasNext())foo(i.next());}
这些经过包装后线程安全容器,都是基于synchronized这个同步关键字实现的,所以也被称为同步容器。Java提供的同步容器还有Vector、Stack和Hashtable,这三个容器不是基于包装类实现的,但同样是基于 synchronized实现的,对这三个容器的遍历,同样要加锁保证互斥。
并发容器:
Java1.5及之后版本提供了性能更高的容器,我们一般称为并发容器。
并发容器?
并发容器虽然数量非常多,但依然是前面我们提到的四大类:List、Map、Set和Queue,下面的并发容器关系图,基本上把我们经常用的容器都覆盖到了。

(一)List?
(二)Map
。。。。
JUC提供的原子类?

类别一:原子化的基本数据类型?
getAndIncrement() // 原子化 i++getAndDecrement() // 原子化 i--incrementAndGet() // 原子化 ++idecrementAndGet() // 原子化 --i//当前值+=delta,返回+=前的值getAndAdd(delta)//当前值+=delta,返回+=后的值addAndGet(delta)//CAS操作,返回是否成功compareAndSet(expect,update)// 以下四个方法// 新值可以通过传入func函数来计算getAndUpdate(func)updateAndGet(func)getAndAccumulate(x,func)accumulateAndGet(x,func)
类别二: 原子化的对象引用类型?
AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:
boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法签名如下
boolean compareAndSet(V expectedReference,V newReference,boolean expectedMark,boolean newMark)
分类三:原子化数组?
分类四:原子化对象属性更新器?
相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:
public static <U> AtomicXXXFieldUpdater<U> newUpdater(Class<U> tclass,String fieldName)
注意,对象属性必须是volatile类型的,只有这样才能保证可见性;如果对象属性不是volatile 类型的,newUpdater()方法会抛出IllegalArgumentException这个运行时异常。
boolean compareAndSet(T obj,int expect,int update)
分类五:原子化的累加器?
累加器的例子?
public class Test {long count = 0;void add10K() {int idx = 0;while(idx++ < 10000){count += 1;}}}
原子性问题的其他解法?
利用原子类解决累加器问题?
如下,两处简单的改动就能使add10K()方法变成线程安全的。
public class Test {// ① long型变量count替换为了原子类AtomicLongAtomicLong count = new AtomicLong(0);void add10K() {int idx = 0;while(idx++ < 10000) {// ② count+=1替换成了count.getAndIncrement()count.getAndIncrement();}}}
无锁方案的实现原理?
通过CAS指令的模拟代码来理解CAS的工作原理?
class SimulatedCAS{int count;synchronized int cas(int expect, int newValue){// 读目前 count 的值int curValue = count;// 比较目前 count 值是否 == 期望值if(curValue == expect){// 如果是,则更新 count 的值count = newValue;}// 返回写⼊前的值return curValue;}}
累加器中的cas语义?
实现线程安全的累加器-CAS+自旋方案?
class SimulatedCAS{volatile int count;// 实现count+=1addOne() {do {newValue = count+1; //①}while(count != cas(count,newValue) //②}// 模拟实现CAS,仅用来帮助理解synchronized int cas(int expect, int newValue){//读目前count的值int curValue = count;//比较目前count值是否==期望值if(curValue == expect){//如果是,则更新count的值count = newValue;}// 返回写当前的值return curValue;}}
CAS无锁方案的优缺点?
什么情况下关注ABA问题?
Java 如何实现原子化的count += 1?
Java 是如何使用 CAS 来实现原子化的count += 1的?
-- 在Java1.8中,getAndIncrement() 方法会转调 unsafe.getAndAddLong()方法;
-- 这里 this 和valueOffset 两个参数可以唯一确定共享变量的内存地址。
final long getAndIncrement() {return unsafe.getAndAddLong(this, valueOffset, 1L);}
unsafe.getAndAddLong()方法的源码如下
-- 该方法首先会在内存中读取共享变量的值,之后循环调用 compareAndSwapLong() 方法来尝试设置共享变量的值,直到成功为止。
-- compareAndSwapLong() 是一个 native 方法,只有当内存中共享变量的值等于 expected 时,才会将共享变量的值更新为 x,并且返回 true;否则返回fasle。
-- compareAndSwapLong 的语义和CAS 指令的语义的差别仅仅是返回值不同而已。
public final long getAndAddLong(Object o, long offset, long delta){long v;do {// 读取内存中的值v = getLongVolatile(o, offset);} while (!compareAndSwapLong(o, offset, v, v + delta));return v;}// 原子性地将变量更新为x的条件是内存中的值等于 expected// 更新成功则返回 truenative boolean compareAndSwapLong(Object o, long offset,long expected,long x);
CAS应用的抽象代码逻辑?
do {// 获取当前值oldV = xxxx;// 根据当前值计算新值newV = ...oldV...}while(!compareAndSet(oldV,newV);
思考题:隐藏的while(true)问题?
下面的示例代码是合理库存的原子化实现,仅实现了设置库存上限setUpper()方法,你觉得setUpper()方法的实现是否正确呢?
public class SafeWM {class WMRange{final int upper;final int lower;WMRange(int upper,int lower){// 省略构造函数实现}}final AtomicReference<WMRange>rf = new AtomicReference<>(new WMRange(0,0));// 设置库存上限void setUpper(int v){WMRange nr;WMRange or = rf.get();do{// 检查参数合法性if(v < or.lower){throw new IllegalArgumentException();}nr = newWMRange(v, or.lower);}while(!rf.compareAndSet(or, nr));}}
解题:
看上去 while(!rf.compareAndSet(or, nr))是有终止条件的,而且跑单线程测试一直都没有问题。实际上却存在严重的并发问题,问题就出在对or的赋值在while循环之外,这样每次循环or的值都不会发生变化,所以一旦有一次循环rf.compareAndSet(or,nr)的值等于false,那之后无论循环多少次,都会等于false。也就是说在特定场景下,变成了while(true)问题。既然找到了原因,修改就很简单了,只要把对or的赋值移到while循环之内就可以了,修改后的代码如下所示:
public class SafeWM {class WMRange{final int upper;final int lower;WMRange(int upper,int lower){//省略构造函数实现}}final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0,0));// 设置库存上限void setUpper(int v){WMRange nr;WMRange or;//原代码在这⾥//WMRange or=rf.get();do{//移动到此处//每个回合都需要重新获取旧值or = rf.get();// 检查参数合法性if(v < or.lower){throw new IllegalArgumentException();}nr = new WMRange(v, or.lower);}while(!rf.compareAndSet(or, nr));}}
为什么使用线程池?
一般意义上的池化资源?
class XXXPool{// 获取池化资源XXX acquire(){}// 释放池化资源void release(XXX x){}}
为什么线程池没有采用一般意义上池化资源的设计方法呢?
如果线程池采用一般意义上池化资源的设计方法,应该是下面示例代码这样。你可以来思考一下,假设我们获取到一个空闲线程T1,然后该如何使用T1呢?你期望的可能是这样:通过调用T1的execute()方法,传入一个Runnable对象来执行具体业务逻辑,就像通过构造函数Thread(Runnable target)创建线程一样。可惜的是,你翻遍Thread对象的所有方法,都不存在类似execute(Runnable target)这样的公共方法。
//采一般般意义上池化资源的设计方法class ThreadPool{// 获取空闲线程Thread acquire() {}// 释放线程void release(Thread t){}}//期望的使用ThreadPool pool;Thread T1=pool.acquire();//传入Runnable对象T1.execute(()->{//具体业务逻辑......});
所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法。
线程池如何设计呢?
通过创建简单的线程池MyThreadPool,理解线程池的工作原理?
//简化的线程池,仅用来说明工作原理class MyThreadPool{//利用阻塞队列实现生产者-消费者模式BlockingQueue<Runnable> workQueue;//保存内部工作线程List<WorkerThread> threads = new ArrayList<>();// 构造方法MyThreadPool(int poolSize,BlockingQueue<Runnable> workQueue){this.workQueue = workQueue;// 创建工作线程for(int idx=0; idx<poolSize; idx++){WorkerThread work = new WorkerThread();work.start();threads.add(work);}}// 提交任务void execute(Runnable command){workQueue.put(command);}// 工作线程负责消费任务,并执行任务class WorkerThread extends Thread{public void run() {//循环取任务并执行while(true){ ①Runnable task = workQueue.take();task.run();}}}}/** 下面是使用示例 **/// 创建有界阻塞队列BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);// 创建线程池MyThreadPool pool = new MyThreadPool(10, workQueue);// 提交任务pool.execute(()->{System.out.println("hello");});
如何使用Java中的线程池?
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
线程池的参数介绍?
使用线程池要注意些什么?
try {//业务逻辑} catch (RuntimeException x) {//按需处理} catch (Throwable x) {//按需处理}
课后思考:
使用线程池,默认情况下创建的线程名字都类似pool-1-thread-2这样,没有业务含义。而很多情况下为了便于诊断问题,都需要给线程赋予一个有意义的名字,那你知道有哪些办法可以给线程池里的线程指定名字吗?
问题:
在上一篇文章介绍了如何创建正确的线程池,那创建完线程池,我们该如何使用呢?
ThreadPoolExecutor提交和获取任务结果?
// 提交Runnable任务Future<?> submit(Runnable task);// 提交Callable任务<T> Future<T> submit(Callable<T> task);// 提交Runnable任务及结果引⽤<T> Future<T> submit(Runnable task, T result);
Future接口的5个方法?
// 取消任务boolean cancel(boolean mayInterruptIfRunning);// 判断任务是否已取消boolean isCancelled();// 判断任务是否已结束boolean isDone();// 获得任务执行结果get();// 获得任务执行结果,支持超时get(long timeout,TimeUnit unit);
3个submit()方法之间的区别在于方法参数不同,分别介绍?
方法③submit(Runnable task, T result)的用法?
ExecutorService executor = Executors.newFixedThreadPool(1);// 创建Result对象rResult r = new Result();r.setAAA(a);// 提交任务Future<Result> future = executor.submit(new Task(r), r);Result fr = future.get();// 下面等式成立fr === r;fr.getAAA() === a;fr.getXXX() === xclass Task implements Runnable{Result r;//通过构造函数传入resultTask(Result r){this.r = r;}void run() {//可以操作resulta = r.getAAA();r.setXXX(x);}}
介绍FutureTask工具类?
FutureTask(Callable<V> callable);FutureTask(Runnable runnable, V result);
如何使用FutureTask?
示例代码①: 将FutureTask对象提交给ThreadPoolExecutor去执行。
// 创建FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);// 创建线程池ExecutorService es = Executors.newCachedThreadPool();// 提交FutureTaskes.submit(futureTask);// 获取计算结果Integer result = futureTask.get();
示例代码②:FutureTask对象直接被Thread执行的示例代码如下所示。
可以看出:利用FutureTask对象可以很容易获取子线程的执行结果。
// 创建FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);// 创建并启动线程Thread T1 = new Thread(futureTask);T1.start();// 获取计算结果Integer result = futureTask.get();
实现最优的“烧水泡茶”程序?


// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft1Thread T1 = new Thread(ft1);T1.start();// 线程T2执行任务ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());// T1Task需要执行的任务:// 洗水壶、烧开水、泡茶class T1Task implements Callable<String>{FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@OverrideString call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);String tf = ft2.get(); // ★ 获取T2线程的茶叶System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶class T2Task implements Callable<String>{@OverrideString call() throws Exception{System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}}// 一次执行结果:T1:洗水壶...T2:洗茶壶...T1:烧开水...T2:洗茶杯...T2:拿茶叶...T1:拿到茶叶:龙井T1:泡茶...上茶:龙井
总结
课后思考
不久前听说小明要做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里。核心示例代码如下所示,由于是串行的,所以性能很慢,你来试着优化一下吧。
// 向电商S1询价,并保存r1 = getPriceByS1();save(r1);// 向电商S2询价,并保存r2 = getPriceByS2();// 向电商S3询价,并保存r3 = getPriceByS3();
异步代码?
例子:
如下代码,是串行的,为了提升性能,需要并行化,那具体实施起来该怎么做呢?
//以下两个方法都是耗时操作doBizA();doBizB()
实现:
如下代码,创建两个子线程去执行。如下并行方案,主线程无需等待doBizA()和doBizB()的执行结果,即doBizA()和doBizB()两个操作已经被异步化。
new Thread(()->doBizA()).start();new Thread(()->doBizB()).start();
异步化?
异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。看到这里,相信你应该就能理解异步编程最近几年为什么会大火了,因为优化性能是互联网大厂的一个核心需求啊。Java1.8提供了CompletableFuture来支持异步编程,CompletableFuture有可能是你见过的最复杂的工具类了,不过功能也着实让人感到震撼。
CompletableFuture的核心优势?
1)无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
2)语义更清晰。
例如下代码中f3=f1.thenCombine(f2,()->{})能够清晰地表述“任务3要等待任务1和任务2都完成后才能开始”;
3)代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
使用CompletableFuture重新实现前面曾提及的烧水泡茶程序?
首先还是需要先完成分工方案,在下面的程序中,我们分了3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。这个分工如下图所示。

//任务1:洗水壶->烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执⾏:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());void sleep(int t,TimeUnit u){try {u.sleep(t);}catch(InterruptedException e){}}// 一次执行结果:T1:洗水壶...T2:洗茶壶...T1:烧开水...T2:洗茶杯...T2:拿茶叶...T1:拿到茶叶:龙井T1:泡茶...上茶:龙井
介绍CompletableFuture的使用
如何创建CompletableFuture对象?
创建CompletableFuture对象主要靠下面代码中展示的这4个静态方法:
//使用默认线程池static CompletableFuture<Void> runAsync(Runnable runnable)static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//可以指定线程池static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
前两个方法:
-- 上面例子中使用了runAsync(Runnable runnable)和supplyAsync(Supplier supplier)
-- 区别是:Runnable接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的。
异步操作需要关注的两个问题?
如何理解CompletionStage接口?

CompletionStage接口如何描述串行关系、AND聚合关系、OR聚合关系以及异常处理?
CompletionStage接口--描述串行关系?
CompletionStage<R> thenApply(fn);CompletionStage<R> thenApplyAsync(fn);CompletionStage<Void> thenAccept(consumer);CompletionStage<Void> thenAcceptAsync(consumer);CompletionStage<Void> thenRun(action);CompletionStage<Void> thenRunAsync(action);CompletionStage<R> thenCompose(fn);CompletionStage<R> thenComposeAsync(fn);
thenApply()方法是如何使用的?
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(()->"Hello World") //①.thenApply(s->s +"QQ") //②.thenApply(String::toUpperCase);//③System.out.println(f0.join());//输出结果HELLO WORLD QQ
CompletionStage接口--描述AND汇聚关系?
CompletionStage<R> thenCombine(other, fn);CompletionStage<R> thenCombineAsync(other, fn);CompletionStage<Void> thenAcceptBoth(other, consumer);CompletionStage<Void> thenAcceptBothAsync(other, consumer);CompletionStage<Void> runAfterBoth(other, action);CompletionStage<Void> runAfterBothAsync(other, action);
CompletionStage接口--描述OR汇聚关系?
CompletionStage applyToEither(other, fn);CompletionStage applyToEitherAsync(other, fn);CompletionStage acceptEither(other, consumer);CompletionStage acceptEitherAsync(other, consumer);CompletionStage runAfterEither(other, action);CompletionStage runAfterEitherAsync(other, action);
如何使用applyToEither()方法来描述一个OR汇聚关系?
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{int t = getRandom(5, 10);sleep(t, TimeUnit.SECONDS);return String.valueOf(t);});CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{int t = getRandom(5, 10);sleep(t, TimeUnit.SECONDS);return String.valueOf(t);});CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s);System.out.println(f3.join());
CompletionStage接口--异常处理?
虽然上面我们提到的fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行7/0就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
CompletableFuture<Integer> f0 = CompletableFuture..supplyAsync(()->(7/0)).thenApply(r->r*10);System.out.println(f0.join());
CompletionStage接口给我们提供的方案非常简单,比try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
CompletionStage exceptionally(fn);CompletionStage<R> whenComplete(consumer);CompletionStage<R> whenCompleteAsync(consumer);CompletionStage<R> handle(fn);CompletionStage<R> handleAsync(fn);
如何使用exceptionally()方法来处理异常?
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(()->7/0)).thenApply(r->r*10).exceptionally(e->0);System.out.println(f0.join())
总结
思考题
问题:
创建采购订单的时候,需要校验一些规则,例如最大金额是和采购员级别相关的。有同学利用CompletableFuture实现了这个校验的功能,逻辑很简单,首先是从数据库中把相关规则查出来,然后执行规则校验。你觉得他的实现是否有问题呢?
//采购订单PurchersOrder po;CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{//在数据库中查询规则return findRuleByJdbc();}).thenApply(r -> {//规则校验return check(po, r);});Boolean isOk = cf.join();
这段代码的问题,例如没有异常处理、逻辑不严谨等等,不过我更想让你关注的是:findRuleByJdbc()这个方法隐藏着一个阻塞式I/O,这意味着会阻塞调用线程。默认情况下所有的CompletableFuture共享一个ForkJoinPool,当有阻塞式I/O时,可能导致所有的ForkJoinPool线程都阻塞,进而影响整个系统的性能。
//采购订单PurchersOrder po;CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{//在数据库中查询规则return findRuleByJdbc();}).thenApply(r ->{//规则校验return check(po,r);});Boolean isOk = cf.join();
如何优化一个询价应用的核心代码?
如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用Future的get()方法获取询价结果,之后将询价结果保存在数据库中。
// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 异步向电商S1询价Future<Integer> f1 = executor.submit(()->getPriceByS1());// 异步向电商S2询价Future<Integer> f2 = executor.submit(()->getPriceByS2());// 异步向电商S3询价Future<Integer> f3 = executor.submit(()->getPriceByS3());// 获取电商S1报价并异步保存executor.execute(()->save(f1.get()));// 获取电商S2报价并异步保存executor.execute(()->save(f2.get())// 获取电商S3报价并异步保存executor.execute(()->save(f3.get())
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。这点小瑕疵你该如何解决呢?
// 创建阻塞队列BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();//电商S1报价异步进入阻塞队列executor.execute(()->bq.put(f1.get()));//电商S2报价异步进入阻塞队列executor.execute(()->bq.put(f2.get()));//电商S3报价异步进入阻塞队列executor.execute(()->bq.put(f3.get()));//异步保存所有报价for (int i=0; i<3; i++) {Integer r = bq.take();executor.execute(()->save(r));}
利用CompletionService实现询价系统?
如何创建CompletionService呢?
CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
1. ExecutorCompletionService(Executor executor);2. ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>>completionQueue)。
下面的示例代码完整地展示了如何利用CompletionService来实现高性能的询价系统。
-- 其中,没有指定completionQueue,因此默认使用无界的LinkedBlockingQueue。
-- 之后通过CompletionService接口提供的submit()方法提交了三个询价操作,这三个询价操作将会被CompletionService异步执行。
-- 最后,我们通过CompletionService接口提供的take()方法获取一个Future对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的Future对象),调用Future对象的get()方法就能返回询价操作的执行结果了。
// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);// 异步向电商S1询价cs.submit(()->getPriceByS1());// 异步向电商S2询价cs.submit(()->getPriceByS2());// 异步向电商S3询价cs.submit(()->getPriceByS3());// 将询价结果异步保存到数据库for (int i=0; i<3; i++) {Integer r = cs.take().get();executor.execute(()->save(r));}
CompletionService接口说明?
介绍一下CompletionService接口提供的方法,CompletionService接口提供的方法有5个,这5个方法的方法签名如下所示。
Future<V> submit(Callable<V> task);Future<V> submit(Runnable task, V result);Future<V> take() throws InterruptedException;Future<V> poll();Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
其中,submit()相关的方法有两个。
① 一个方法参数是Callable task,前面利用CompletionService实现询价系统的示例代码中,我们提交任务就是用的它。
② 另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于ThreadPoolExecutor的 Future submit(Runnable task,T result) ,不再赘述。
利用CompletionService实现Dubbo中的Forking Cluster?
geocoder(addr) {//并行执行以下3个查询服务,r1=geocoderByS1(addr);r2=geocoderByS2(addr);r3=geocoderByS3(addr);//只要r1,r2,r3有一个返回,则返回return r1|r2|r3;}
具体是如何实现Forking这种集群模式?
// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);// 用于保存Future对象List<Future<Integer>> futures = new ArrayList<>(3);//提交异步任务,并保存future到futuresfutures.add(cs.submit(()->geocoderByS1()));futures.add(cs.submit(()->geocoderByS2()));futures.add(cs.submit(()->geocoderByS3()));// 获取最快返回的任务执行结果Integer r = 0;try {// 只要有一个成功返回,则breakfor (int i = 0; i < 3; ++i) {r = cs.take().get();//简单地通过判空来检查是否成功返回if (r != null) {break;}}} finally {//取消所有任务for(Future<Integer> f : futures)f.cancel(true);}// 返回结果return r;
总结
思考题:
// 创建线程池ExecutorService executor =Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService<Integer> cs = newExecutorCompletionService<>(executor);// 异步向电商S1询价cs.submit(()->getPriceByS1());// 异步向电商S2询价cs.submit(()->getPriceByS2());// 异步向电商S3询价cs.submit(()->getPriceByS3());// 将询价结果异步保存到数据库// 并计算最低报价AtomicReference<Integer> m =new AtomicReference<>(Integer.MAX_VALUE);for (int i=0; i<3; i++) {executor.execute(()->{Integer r = null;try {r = cs.take().get();} catch (Exception e) {}save(r);m.set(Integer.min(m.get(), r));});}return m;
并发编程的任务角度和协作细节角度?

分治任务?
分治任务模型?

Fork/Join的介绍?
Fork/Join的使用?
static void main(String[] args){//创建分治任务线程池ForkJoinPool fjp = newForkJoinPool(4);//创建分治任务Fibonacci fib = new Fibonacci(30);//启动分治任务Integer result = fjp.invoke(fib);//输出结果System.out.println(result);}//递归任务static class Fibonacci extends RecursiveTask<Integer>{final int n;Fibonacci(int n){this.n = n;}protected Integer compute(){if(n <= 1)return n;Fibonacci f1 = new Fibonacci(n - 1);//创建子任务f1.fork();Fibonacci f2 =new Fibonacci(n - 2);//等待子任务结果,并合并结果return f2.compute() + f1.join();}}
ForkJoinPool工作原理?

模拟MapReduce统计单词数量?
static void main(String[] args){String[] fc = {"hello world","hello me","hello fork","hello join","fork join in world"};//创建ForkJoin线程池ForkJoinPool fjp =new ForkJoinPool(3);//创建任务MR mr = new MR(fc, 0, fc.length);//启动任务Map<String, Long> result =fjp.invoke(mr);//输出结果result.forEach((k, v)->System.out.println(k+":"+v));}//MR模拟类static class MR extendsRecursiveTask<Map<String, Long>> {private String[] fc;private int start, end;//构造函数MR(String[] fc, int fr, int to){this.fc = fc;this.start = fr;this.end = to;}@Override protectedMap<String, Long> compute(){if (end - start == 1) {return calc(fc[start]);} else {int mid = (start+end)/2;MR mr1 = new MR(fc, start, mid);mr1.fork();MR mr2 = new MR(fc, mid, end);//计算⼦任务,并返回合并的结果return merge(mr2.compute(),mr1.join());}}//合并结果private Map<String, Long> merge(Map<String, Long> r1,Map<String, Long> r2) {Map<String, Long> result =new HashMap<>();result.putAll(r1);//合并结果r2.forEach((k, v) -> {Long c = result.get(k);if (c != null)result.put(k, c+v);elseresult.put(k, v);});return result;}//统计单词数量private Map<String, Long>calc(String line) {Map<String, Long> result =new HashMap<>();//分割单词String [] words =line.split("\\s+");//统计单词数量for (String w : words) {Long v = result.get(w);if (v != null)result.put(w, v+1);elseresult.put(w, 1L);}return result;}
总结
Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小
的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的
MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。
Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工
作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream
API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享
一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密
集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计
算而拖慢整个系统的性能。所以建议用不同的ForkJoinPool执行不同类型的计算任务。
如果你对ForkJoinPool详细的实现细节感兴趣,也可以参考Doug Lea的论文。http://gee.cs.oswego.edu/dl/papers/fj.pdf
思考题:
对于一个CPU密集型计算程序,在单核CPU上,使用Fork/Join并行计算框架是否能够提高性能呢?
异步刷盘方式
一个自实现的日志组件,写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,往往采用异步刷盘的方式。
class Logger {//任务队列final BlockingQueue<LogMsg> bq = new BlockingQueue<>();//flush批量static final int batchSize = 500;//只需要⼀个线程写⽇志ExecutorService es = Executors.newFixedThreadPool(1);//启动写⽇志线程void start() {File file = File.createTempFile("foo", ".log");final FileWriter writer = new FileWriter(file);this.es.execute(() -> {try {//未刷盘⽇志数量int curIdx = 0;long preFT = System.currentTimeMillis();while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//写⽇志if (log != null) {writer.write(log.toString());++curIdx;}//如果不存在未刷盘数据,则⽆需刷盘if (curIdx <= 0) {continue;}//根据规则刷盘if (log != null && log.level == LEVEL.ERROR || curIdx == batchSize ||System.currentTimeMillis() - preFT > 5000) {writer.flush();curIdx = 0;preFT = System.currentTimeMillis();}}} catch (Exception e) {e.printStackTrace();} finally {try {writer.flush();writer.close();} catch (IOException e) {e.printStackTrace();}}});}//写INFO级别⽇志void info(String msg) {bq.put(new LogMsg(LEVEL.INFO, msg));}//写ERROR级别⽇志void error(String msg) {bq.put(new LogMsg(LEVEL.ERROR, msg));}}//⽇志级别enum LEVEL {INFO, ERROR}class LogMsg {LEVEL level;String msg;//省略构造函数实现LogMsg(LEVEL lvl, String msg) {}//省略toString()实现String toString() {}}