@songhanshi
2021-01-14T11:26:28.000000Z
字数 97166
阅读 653
Java学习
笔记内容来源:
1. jk:《Java并发编程实战》-王宝令(极客)
2. xn:《Java性能调优实战》-多线程性能调优-刘超(极客)
https://blog.csdn.net/PROGRAM_anywhere/article/details/83552126
为什么要用到并发
并发编程有哪些缺点
减少上下文切换
通常减少上下文切换可以采用无锁并发编程,CAS算法,使用最少的线程和使用协程。
--------------------分割线------------
时间片?
上下文切换是什么?
多核下的上下文切换?
多线程上下文切换的原因?
线程运行时,各状态之间转换的原因?
自发性上下文切换?
非自发性上下文切换?
上下文切换性能问题?
串行和并行代码的比较:
如何监测到上下文切换?
上下文切换到底开销在哪些环节?
在并发量比较大的情况下,什么时候用单线程,什么时候用多线程呢?
在多线程中使用Synchronized还会发生进程间的上下文切换吗?具体又会发生在哪些环节呢?
jk |
---|
参考:
本节说的可能并不好。该篇我看了三遍也没能完全看懂,于是自己搜索java管程相关的技术文章,才大致对管程有了个认知,总结如下:
1.管程是一种概念,任何语言都可以通用。
2.在java中,每个加锁的对象都绑定着一个管程(监视器)
3.线程访问加锁对象,就是去拥有一个监视器的过程。如一个病人去门诊室看医生,医生是共享资源,门锁锁定医生,病人去看医生,就是访问医生这个共享资源,门诊室其实是监视器(管程)。
4.所有线程访问共享资源,都需要先拥有监视器。就像所有病人看病都需要先拥有进入门诊室的资格。
5.监视器至少有两个等待队列。一个是进入监视器的等待队列一个是条件变量对应的等待队列。后者可以有多个。就像一个病人进入门诊室诊断后,需要去验血,那么它需要去抽血室排队等待。另外一个病人心脏不舒服,需要去拍胸片,去拍摄室等待。
6.监视器要求的条件满足后,位于条件变量下等待的线程需要重新在门诊室门外排队,等待进入监视器。就像抽血的那位,抽完后,拿到了化验单,然后,重新回到门诊室等待,然后进入看病,然后退出,医生通知下一位进入。
总结起来就是,管程就是一个对象监视器。任何线程想要访问该资源,就要排队进入监控范围。进入之后,接受检查,不符合条件,则要继续等待,直到被通知,然后继续进入监视器。
管程的发展?
为什么Java在1.5之前仅仅提供了synchronized关键字及wait()、notify()、notifyAll()这三个看似从天而降的方法?
在刚接触Java的时候,我以为它会提供信号量这种编程原语,因为操作系统原理课程告诉我,用信号量能解决所有并发问题,结果我发现不是。
什么是管程?
管程如何管理?
MESA模型介绍?
代码再次说明MESA模型?
代码实现一个阻塞队列,阻塞队列有两个操作分别是入队和出队,这两个方法都是先获取互斥锁,类比管程模型中的入口。
1) 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();
2) 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();
3) 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列;
4) 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列。
public class BlockedQueue<T> {
final Lock lock = new ReentrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满)
// 等待队列不满
notFull.await();
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 出队
void deq() {
lock.lock();
try {
while (队列已空)
// 等待队列不空
notEmpty.await();
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
} finally {
lock.unlock();
}
}
}
示例中,用了Java并发包里面的Lock和Condition。
注意:await()和前面我们提到的wait()语义是一样的;signal()和前面我们提到的notify()语义是一样的。
wait() 的正确姿势?
对于MESA管程来说,有一个编程范式,就是需要在一个while循环里面调用wait()。这个是MESA管程特有的。
while(条件不满足) {
wait();
}
Hasen模型、Hoare模型和MESA模型的一个核心区别就是当条件满足后,如何通知相关线程。
notify()何时可以使用?
while (队列已满)
// 等待队列不满
notFull.await()
所有等待线程被唤醒后执行的操作也是相同的,都是下面这几行:
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
同时也满足第 3 条,只需要唤醒一个线程。所以上面阻塞队列的代码,使用 signal() 是可以的。
Java内置的管程?
主要内容?
“并发编程中共享变量的一致性”。
CPU缓存导致的数据不一致、重排序
结合Happens-before规则,可以将一致性分为以下几个级别:
Happens-before 规则?
多线程操作共享变量可能出现的问题?
假设有两个线程(线程 1 和线程 2)分别执行下面的方法,x 是共享变量:
// 代码 1
public class Example {
int x = 0;
public void count() {
x++; //1
System.out.println(x)//2
}
}
两个线程同时运行,线程的变量的值可能的结果:
线程1调用count | 线程2调用count |
---|---|
x++; | x++; |
结果:
结果1 | 结果2 | 结果3 |
---|---|---|
1,1 | 2,1 | 1,2 |
CPU缓存-理解"1,1"的情况?
CPU缓存-理解"1,1"的情况?
1,1 的运行结果:
重排序问题?
// 代码 1
public class Example {
int x = 0;
boolean flag = false;
public void writer() {
x = 1; //1
flag = true; //2
}
public void reader() {
if (flag) { //3
int r1 = x; //4
System.out.println(r1==x)
}
}
}
重排序-问题解析?
int x = 1;//步骤1:加载x变量的内存地址到寄存器中,加载1到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中
boolean flag = true; //步骤2 加载flag变量的内存地址到寄存器中,加载true到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中
int y = x + 1;//步骤3 重新加载x变量的内存地址到寄存器中,加载1到寄存器中,CPU通过mov指令把1写入到寄存器指定的内存中
临界区?
管程?
synchronized (this)
{ // 此处自动加锁
// x 是共享变量, 初始值 =10
if (this.x < 12) {
this.x = 12;
}
} // 此处自动解锁
什么是线程安全呢?
如何写出线程安全的程序呢?
所有的代码都需要分析否存在这三个问题吗?
什么叫数据竞争?
public class Test {
private long count = 0;
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count += 1;
}
}
}
竞态条件(Race Condition)?
if (状态变量 满足 执行条件) {
执行操作
}
-- 当某个线程发现状态变量满足执行条件后,开始执行操作;可是就在这个线程执行操作的时候,其他线程同时修改了状态变量导致状态变量不满足执行条件了。当然很多场景下,这个条件不是显式的。
例子2-1:例子1加锁
-- 在访问数据的地方加个锁保护,如下:
-- 所有访问共享变量value的地方,都增加了互斥锁,此时是不存在数据
竞争的。但add10K()并不是线程安全的。
public class Test {
private long count = 0;
// synchronized
synchronized long get(){
return count;
}
// synchronized
synchronized void set(long v){
count = v;
}
void add10K() {
int idx = 0;
while(idx++ < 10000) {
set(get()+1)
}
}
}
问题:竞态条件
-- 假设count=0,A、B两个线程同时执行get()方法时,get()方法会返回相同的值0,A、B执行get()+1操作,结果都是1,之后A、B再将结果1写入了内存。期望是 2,而结果却是1。
-- A、B完全同时执行,结果是 1;A、B前后执行,结果是2。
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
if (this.balance > amt) { // 6
this.balance -= amt; // A-150
target.balance += amt;//B+150
}
}
}
数据竞争、竞态条件如何保证线程安全?
活跃性问题概念?
死锁、活锁、饥饿?
性能问题?
串行对性能的影响是怎么样的呢?
假设串行百分比是 5%,用多核多线程相比单核单线程能提速多少呢?
怎么避免锁带来的性能问题呢?
方案层面:
性能方面的三个重要度量指标?
可见性概念
一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性。
缓存导致的可见性问题?
验证多核场景下的可见性问题
public class Test {
private static long count =0;
private void add10K(){
int idx=0;
while (idx++<100000){
count+=1;
}
}
private static long calc(){
final Test test =new Test();
//创建两个线程,执行 add() 操作
Thread thread1=new Thread(()->{
test.add10K();
});
Thread thread2=new Thread(()->{
test.add10K();
});
//启动两个线程
thread1.start();
thread2.start();
//等待两个线程执行结束
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
return count;
}
public static void main(String[] args) {
calc();
}
}
时间片
原子性概念
线程切换
任务切换->线程切换发展
count+=1
1)CPU指令
2)执行顺序
指令 1:把变量 count 从内存加载到 CPU 的寄存器;
指令 2:在寄存器中执行 +1 操作;
指令 3:将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。
有序性
例子
双重检查单例模式(DCL)
public class SingletonLazy {
/**懒汉模式-双重检验锁-线程安全且并行效率高*/
private static SingletonLazy instance;
public static SingletonLazy getInstance() {
// 先判断实例是否存在,若不存在再对类对象进行加锁处理
if (instance == null) { //Single Checked
synchronized (SingletonLazy.class) {
if (instance == null) { //Double Checked
instance = new SingletonLazy();
}
}
}
return instance;
}
}
DCL 问题:出在 new 操作上
DCL 问题:导致问题-空指针
概述
例子
class VolatileDemo {
int x = 0;
volatile boolean v = false;
public void writer() {
x = 42;
v = true;
}
public void reader() {
if (v == true) {
// 这里 x 会是多少呢?
}
}
}
Java5前的问题
概念
Happens-Before 六规则
程序前面对某个变量的修改一定是对后续操作可见的。
// “x = 42;” Happens-Before “v = true;”
x = 42;
v = true;
volatile 变量规则
管程中的锁在 Java 里是隐式实现的
synchronized (this) { // 此处自动加锁
// x 是共享变量, 初始值 =10
if (this.x < 12) {
this.x = 12;
}
} // 此处自动解锁
结合规则 4——管程中锁的规则理解:假设 x 的初始值是 10,线程 A 执行完代码块后 x 的值会变成12(执行完自动释放锁),线程 B 进入代码块时,能够看到线程 A 对 x 的写操作,也就是线程 B 能够看到x==12。
指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
Thread B = new Thread(() -> {
// 主线程调用 B.start() 之前
// 所有对共享变量的修改,此处皆可见
// 此例中,var==77
});
// 此处对共享变量 var 修改
var =77;
// 主线程启动子线程
B.start();
线程的join()规则
换句话说就是,如果在线程 A 中,调用线程 B 的 join() 并成功返回,那么线程 B 中的任意操作 Happens-Before 于该 join() 操作的返回。
Thread B = new Thread(()->{
// 此处对共享变量 var 修改
var = 66;
});
// 例如此处对共享变量修改,
// 则这个修改结果对线程 B 可见
// 主线程启动子线程
B.start();
B.join();
// 子线程所有对共享变量的修改
// 在主线程调用 B.join() 之后皆可见
// 此例中,var==66
线程中断规则:
概念
逸出
final int x;
// 错误的构造函数
public FinalFieldExample() {
x = 3;
y = 4;
// 此处就是讲 this 逸出,
global.obj = this;
}
原子性概念?
32位CPU上执行long型变量的写操作?
synchronized的使用?
class X {
//1. 修饰非静态方法
synchronized void foo() {
// 临界区
}
//2. 修饰静态方法
synchronized static void bar() {
// 临界区
}
//3. 修饰代码块
Object obj = new Object();
void baz() {
synchronized (obj) {
// 临界区
}
}
}
修饰方法的时候锁定的是什么呢?
Class X {
//1. 修饰非静态方法
synchronized(this) void foo() {
// 临界区
}
//2. 修饰静态方法
synchronized(X.class) static void bar() {
// 临界区
}
}
用 synchronized 解决 count+=1 问题?
代码
class SafeCalc {
long value = 0L;
long get() {
return value;
}
synchronized void addOne() {
value += 1;
}
}
原子性:
addOne()方法,被synchronized修饰后,无论是单核CPU还是多核CPU,只有一个线程能够执行addOne()方法,所以一定能保证原子操作。
可见性-get():
-- 管程中锁的规则,是只保证后续对这个锁的加锁的可见性,而get()方法并没有加锁操作,可见性没法保证。
-- 加上syc关键字
synchronized long get() {
return value;
}
get()方法和addOne()方法都需要访问value这个受保护的资源,这个资源用this这把锁来保护。线程要进入临界区get()和addOne(),必须先获得this这把锁,这样get()和addOne()也是互斥的。
锁和受保护资源的关系?
加上static
class SafeCalc {
static long value = 0L; // +static
long get() {
return value;
}
synchronized static void addOne() { // +static
value += 1;
}
}
两个锁保护一个资源。这个受保护的资源就是静态变量 value,两个锁分别是 this 和 SafeCalc.class。我们可以用下面这幅图来形象描述这个关系。由于临界区 get() 和 addOne() 是用两个锁保护的,因此这两个临界区没有互斥关系,临界区 addOne() 对 value 的修改对临界区 get() 也没有可见性保证,这就导致并发问题。
两种加锁对比?
问题类型?
class Account {
// 锁:保护账户余额
private final Object balLock = new Object();
// 账户余额
private Integer balance;
// 锁:保护账户密码
private final Object pwLock = new Object();
// 账户密码
private String password;
// 取款
void withdraw(Integer amt) {
synchronized (balLock) {
if (this.balance > amt) {
this.balance -= amt;
}
}
}
// 查看余额
Integer getBalance() {
synchronized (balLock) {
return balance;
}
}
// 更改密码
void updatePassword(String pw) {
synchronized (pwLock) {
this.password = pw;
}
}
// 查看密码
String getPassword() {
synchronized (pwLock) {
return password;
}
}
}
一把互斥锁来保护多个资源的问题?
如何保护有关联关系的多个资源?
class Account {
private int balance;
// 转账
void transfer(Account target, int amt) {
synchronized (Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
死锁的原因?
如何预防死锁?
循环等待问题?
核心代码:
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))
;
如果apply()操作耗时非常短,而且并发冲突量也不大时,循环上几次或者几十次就能一次性获取转出账户和转入账户了。但如果apply()操作耗时长,或者并发冲突量大的时候,可能要循环上万次才能获取到锁太消耗CPU,循环等待方案就不适用了。
循环等待问题的解决方案?
Java语言是如何支持等待-通知机制
类比于现实的等待 - 通知机制的就医流程?
1)患者先去挂号,然后到就诊门口分诊,等待叫号;
2)当叫到自己的号时,患者就可以找大夫就诊了;
3)就诊过程中,大夫可能会让患者去做检查,同时叫下一位患者;
4)当患者做完检查后,拿检测报告重新分诊,等待叫号;
5)当大夫再次叫到自己的号时,患者再去找大夫就诊。
如何用synchronized实现等待-通知机制?
如何用synchronized实现互斥锁?
使用wait()、notify()、notifyAll()方法?
一个更好地资源分配器:如何解决一次性申请转出账户和转入账户的问题?
while(条件不满足) {
wait();
}
```
* 利用这种范式可以解决上面提到的条件曾经满足过这个问题。因为当 wait() 返回时,有可能条件已经发生变化了,曾经条件满足,但是现在已经不满足了,所以要重新检验条件是否满足。范式,意味着是经典做法,所以没有特殊理由不要尝试换个写法。
``` java
class Allocator {
private List<Object> als;
// 一次性申请所有资源
synchronized void apply(
Object from, Object to){
// 经典写法
while(als.contains(from) ||
als.contains(to)){
try{
wait();
}catch(Exception e){
}
}
als.add(from);
als.add(to);
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
notifyAll();
}
}
尽量使用 notifyAll()?
总结
通用 | Java状态 | 状态转换 | interrupt |
---|
1. 通用的线程生命周期?
通用的线程生命周期基本上可以用下图这个“五态模型”来描述。这五态分别是:初始状态、可运行状态、运行状态、休眠状态和终止状态。
这“五态模型”的详细情况如下所示?
1)初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
2)可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
3)当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态。
4)运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
5)线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
五种状态的定义?
这五种状态在不同编程语言里会有简化合并?
Java中线程的生命周期?
具体是哪些情形会导致线程从RUNNABLE状态转换到这三种状态呢?而这三种状态又是何时转换回RUNNABLE的呢?以及 NEW、TERMINATED 和 RUNNABLE 状态是如何转换的?
RUNNABLE与BLOCKED的状态转换?
RUNNABLE与WAITING的状态转换?
RUNNABLE与TIMED_WAITING的状态转换?
有五种场景会触发这种转换:
1)调用带超时参数的 Thread.sleep(long millis) 方法;
2)获得synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
3)调用带超时参数的 Thread.join(long millis) 方法;
4)调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
5)调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
NEW到RUNNABLE状态?
MyThread myThread = new MyThread();
// 从 NEW 状态转换到 RUNNABLE 状态
myThread.start();
从RUNNABLE到TERMINATED状态?
stop() 和 interrupt() 方法的主要区别是什么呢?
被interrupt的线程,是怎么收到通知的呢?
总结
思考题
下面代码的本意是当前线程被中断之后,退出while(true),你觉得这段代码是否正确呢?
Thread th = Thread.currentThread();
while(true) {
if(th.isInterrupted()) {
break;
}
// 省略业务代码无数
try {
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();
}
}
目的:注意InterruptedException 的处理方式。
try {
Thread.sleep(100);
}catch(InterruptedException e){
// 重新设置中断标志位
th.interrupt();
}
1. 各种线程池的线程数量调整成多少是合适的?/Tomcat的线程数、Jdbc连接池的连接数是多少?等等。
2. 如何设置合适的线程数呢?
2. 分析多线程的应用场景有哪些?
如何度量性能?
分析为什么要使用多线程?
怎么降低延迟,提高吞吐量呢?
多线程的应用场景?
示例说明:如何利用多线程来提升 CPU 和 I/O 设备的利用率?
在单核时代,多线程主要就是用来平衡CPU和I/O设备的。如果程序只有CPU计算,而没有I/O 操作的话,多线程不但不会提升性能,还会使性能变得更差,原因是增加了线程切换的成本。但是在多核时代,这种纯计算型的程序也可以利用多线程来提升性能。为什么呢?
I/O 密集型计算、CPU 密集型计算?
创建多少线程合适?
讨论题:
有些同学对于最佳线程数的设置积累了一些经验值,认为对于 I/O密集型应用,最佳线程数应该为:2 * CPU 的核数 + 1,你觉得这个经验值合理吗?
11|Java线程(下):为什么局部变量是线程安全的?
Java 语言里,是不是所有变量都是共享变量呢?
Java 方法里面的局部变量是否存在并发问题呢?
下面我们就先结合一个例子剖析下这个问题。
比如,下面代码里的 fibonacci() 这个方法,会根据传入的参数 n ,返回 1 到 n 的斐波那契数
列,斐波那契数列类似这样: 1、1、2、3、5、8、13、21、34……第 1 项和第 2 项是 1,从第
3 项开始,每一项都等于前两项之和。在这个方法里面,有个局部变量:数组 r 用来保存数列的
结果,每次计算完一项,都会更新数组 r 对应位置中的值。你可以思考这样一个问题,当多个线
程调用 fibonacci() 这个方法的时候,数组 r 是否存在数据竞争(Data Race)呢?
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
Java 语言本身提供的 synchronized 也是管程的一种实现,既然 Java 从语言层面已经实现了管程了,那为什么还要在SDK里提供另外一种实现呢?
对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申
请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?
// 支持中断的 API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();
如何保证可见性?
class X {
private final Lock rtl = new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
答案必须是肯定的。Java SDK里面锁原理简述:利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state的值(简化后的代码如下面所示)。也就是说,在执行 value+=1
之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile变量 state。根据相关的 Happens-Before 规则:
1)顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
2)volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作Happens-Before 线程 T2 的 lock() 操作;
3)传递性规则:线程 T2 的 lock() 操作 Happens-Before 线程 T1 的 value+=1 。
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}
所以说,后续线程 T2 能够看到 value 的正确结果
什么是可重入锁?
class X {
private final Lock rtl = new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
可重入函数?
公平锁与非公平锁?
// 无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
在入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。
-- 如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;
-- 如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
这三条规则,前两条估计你一定会认同,最后一条你可能会觉得过于严苛。但是我还是倾向于你去遵守,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程sleep()的调用,也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。
并发问题,本来就难以诊断,所以你一定要让你的代码尽量安全,尽量简单,哪怕有一点可能会出问题,都要努力避免。
总结
Java SDK 并发包里的 Lock 接口里面的每个方法,你可以感受到,都是经过深思熟虑的。除了支持类似 synchronized 隐式加锁的 lock() 方法外,还支持超时、非阻塞、可中断的方式获取锁,这三种方式为我们编写更加安全、健壮的并发程序提供了很大的便利。希望你以后在使用锁的时候,一定要仔细斟酌。
除了并发大师 Doug Lea推荐的三个最佳实践外,你也可以参考一些诸如:减少锁的持有时间、减小锁的粒度等业界广为人知的规则,其实本质上它们都是相通的,不过是在该加锁的地方加锁而已。
课后思考?
你已经知道 tryLock()支持非阻塞方式获取锁,下面这段关于转账的程序就使用到了 tryLock(),是否存在死锁问题呢?
class Account {
private int balance;
private final Lock lock = new ReentrantLock();
// 转账
void transfer(Account tar, int amt){
while (true) {
if(this.lock.tryLock()) {
try {
if (tar.lock.tryLock()) {
try {
this.balance -= amt;
tar.balance += amt;
} finally {
tar.lock.unlock();
}
}//if
} finally {
this.lock.unlock();
}
}//if
}//while
}//transfer
}
修复后的代码如下所示,我仅仅修改了两个地方,一处是转账成功之后break,另一处是在while循环体结束前增加了Thread.sleep(随机时间)。
class Account {
private int balance;
private final Lock lock = new ReentrantLock();
// 转账
void transfer(Account tar, int amt) {
while (true) {
if (this.lock.tryLock()) {
try {
if (tar.lock.tryLock()) {
try {
this.balance -= amt;
tar.balance += amt;
//新增:退出循环
break;
} finally {
tar.lock.unlock();
}
}//if
} finally {
this.lock.unlock();
}
}//if
//新增:sleep⼀个随机时间避免活锁
Thread.sleep(随机时间);
}//while
}//transfer
}
Java SDK 并发包里的Lock有别于synchronized隐式锁的三个特性:能够响应中断、支持超时和非阻塞地获取锁。
Java SDK 并发包里的 Condition
如何利用两个条件变量快速实现阻塞队列呢?
一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。相
关的代码,重新列出。
public class BlockedQueue<T>{
final Lock lock = new ReentrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
同步和异步?
// 计算圆周率小说点后 100 万位
String pai1M() {
// 省略代码无数
}
pai1M()
printf("hello world")
异步实现的两种方式?
在项目Dubbo中,Lock和Condition是怎么用的?
RPC框架Dubbo内部做了异步转同步,如何实现?分析一下相关源码。
对于下面一个简单的RPC调用,默认情况下sayHello()方法,是个同步方法,也就是说,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。
DemoService service = 初始化部分省略
String message =
service.sayHello("dubbo");
System.out.println(message);
如果此时你将调用线程dump出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明Dubbo帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在DefaultFuture.get()方法上,所以可以推断:Dubbo 异步转同步的功能应该是通过 DefaultFuture这个类实现的。
不过为了理清前后关系,还是有必要分析一下调用DefaultFuture.get() 之前发生了什么。DubboInvoker 的108行调用了DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了 request(inv, timeout)方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中 108 行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}
DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待-通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC 结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
调用线程通过调用 get() 方法等待 RPC返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用unlock()释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal()来通知调用线程,结果已经返回,不用继续等待了。
课后思考:efaultFuture 里面唤醒等待的线程,用的是 signal(),而不是 signalAll(),你来分析一下,这样做是否合理呢?
private void doReceived(Response res) {
lock.lock();
try {
response = res;
done.signalAll();
} finally {
lock.unlock();
}
}
Semaphore简介?
信号量模型?
信号量模型的代码化?
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
// 移除等待队列中的某个线程 T
// 唤醒线程 T
}
}
}
PV原语?
如何使用信号量?
累加器中的互斥?
如下,acquire() 就是信号量里的down()操作,release() 就是信号量里的 up() 操作。
static int count;
// 初始化信号量
static final Semaphore s = new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
信号量的计数器,设置成了1,这个1表示只允许一个线程进入临界区。
分析信号量是如何保证互斥的?
既然Java SDK提供了Lock,为啥还要提供一个Semaphore?
限流器的应用?
快速实现一个限流器?
对象池的示例代码:
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的
exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释
放工作是通过 pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
思考题:在上面对象池的例子中,对象保存在了Vector中,Vector是Java提供的线程安全的容器,如果我们把Vector换成ArrayList,是否可以呢?
总结
Java SDK并发包里为什么还有很多其他的工具类呢?
并发场景:读多写少场景 的分析?
那什么是读写锁呢?
读写锁与互斥锁的重要区别?
如何快速实现一个缓存?
用 ReadWriteLock快速实现一个通用的缓存工具类:
class Cache<K,V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(String key, Data v) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}
处理缓存数据的初始化问题?
实现缓存的按需加载?
//按需加载的功能
class Cache<K,V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
// 读缓存
r.lock(); //①
try {
v = m.get(key); //②
} finally{
r.unlock(); //③
}
// 缓存中存在,返回
if(v != null) { //④
return v;
}
// 缓存中不存在,查询数据库
w.lock(); //⑤
try {
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key); //⑥
if(v == null){ //⑦
// 查询数据库
v= 省略代码⽆数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}
为什么我们要再次验证呢?
上述缓存方案的问题?
读写锁的升级与降级?
上面按需加载的示例代码中,在①处获取读锁,在③处释放读锁,那是否可以在②处的下面增加验证缓存并更新缓存的逻辑呢?
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
if (v == null) {
w.lock();
try {
// 再次验证并更新缓存
// 省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock(); ③
}
锁降级?
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); // ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}
课后思考
有同学反映线上系统停止响应了,CPU 利用率很低,你怀疑有同学一不小心写出了读锁升级写锁的方案,那你该如何验证自己的怀疑呢?
总结
StampedLock 和ReadWriteLock 有哪些区别?
final StampedLock sl =new StampedLock();
// 获取 / 释放悲观读锁⽰意代码
long stamp = sl.readLock();
try {
// 省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取 / 释放写锁⽰意代码
long stamp = sl.writeLock();
try {
// 省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
StampedLock 的性能优于ReadWriteLock的原因?
乐观读悲观读的使用示例?
distanceFromOrigin()方法中,首先通过调用tryOptimisticRead()获取了一个 stamp,这里的tryOptimisticRead()就是乐观读。之后将共享变量x和y读入方法的局部变量中,不过需要注意的是,由于tryOptimisticRead()是无锁的,所以共享变量x和y读入方法局部变量时,x和y有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,验证操作通过调用validate(stamp)来实现的。
class Point {
private int x, y;
final StampedLock sl = new StampedLock();
// 计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读⼊局部变量,读的过程数据可能被修改
int curX = x, curY = y;
// 判断执⾏读操作期间,是否存在写操作,如果存在,则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(curX * curX + curY * curY);
}
}
代码中,如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。
进一步理解乐观读--介绍一下数据库里的乐观锁?
select id,... ,version
from product_doc
where id=777
-- 用户在生产订单UI执行保存操作的时候,后台利用下面的SQL语句更新生产订单,此处我们假设该条生产订单的version=9。
update product_doc
set version=version+1,...
where id=777 and version=9
-- 如果这条 SQL 语句执行成功并且返回的条数等于1,那么说明从生产订单 UI 执行查询操作到执行保存操作期间,没有其他人修改过这条数据。因为如果这期间其他人修改过这条数据,那么版本号字段一定会大于 9。
进一步理解乐观读--StampedLock的乐观读?
StampedLock 使用注意事项?
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(() ->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();
工程使用?
StampedLock读模板:
final StampedLock sl = new StampedLock();
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读⼊⽅法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读⼊⽅法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使⽤⽅法局部变量执⾏业务操作
......
StampedLock 写模板:
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}
锁升级降级?
StampedLock 支持锁的降级(通过 tryConvertToReadLock() 方法实现)和升级(通过tryConvertToWriteLock()方法实现),但是建议你要慎重使用。下面的代码也源自Java的官方示例,仅仅做了一点修改,但隐藏了一个 Bug。
private double x, y;
final StampedLock sl = new StampedLock();
// 存在问题的⽅法
void moveIfAtOrigin(double newX, double newY){
long stamp = sl.readLock();
try {
while(x == 0.0 && y == 0.0){
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
Bug出在没有正确地释放锁。
private double x, y;
final StampedLock sl = new StampedLock();
// 存在问题的⽅法
void moveIfAtOrigin(double newX, double newY){
long stamp = sl.readLock();
try {
while(x == 0.0 && y == 0.0){
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws; //① 问题在于没对stamp重新赋值,增加赋值
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
s1.unlock(stamp);//②此处unlock的是stamp
}
}
对账系统问题?
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执⾏对账操作
diff = check(pos, dos);
// 差异写⼊差异库
save(diff);
}
优化一:利用并行优化对账系统?
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执⾏对账操作
diff = check(pos, dos);
// 差异写⼊差异库
save(diff);
}
优化二:用 CountDownLatch实现线程等待?
// 创建 2 个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何实现等待??*/
// 执⾏对账操作
diff = check(pos, dos);
// 差异写⼊差异库
save(diff);
}
优化二:解决通知的问题?
// 创建 2 个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执⾏对账操作
diff = check(pos, dos);
// 差异写⼊差异库
save(diff);
}
优化三:生产者-消费者+队列?
优化三:如何用双队列来实现完全的并行?
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执⾏回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执⾏对账操作
diff = check(p, d);
// 差异写⼊差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
}
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
}
T2.start();
}
同步容器及其注意事项
如何将非线程安全的容器变成线程安全的容器?
ArrayList变成线程安全的?
SafeArrayList<T>{
// 封装ArrayList
List<T> c=new ArrayList<>();
// 控制访问路径
synchronized T get(int idx){
return c.get(idx);
}
synchronized void add(int idx, T t) {
c.add(idx, t);
}
synchronized boolean addIfNotExist(T t){
if(!c.contains(t)) {
c.add(t);
return true;
}
return false;
}
}
List list = Collections.synchronizedList(new ArrayList());
Set set = Collections.synchronizedSet(new HashSet());
Map map = Collections.synchronizedMap(new HashMap());
List list = Collections.synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
Listlist = Collections.synchronizedList(new ArrayList());
synchronized(list){
Iterator i = list.iterator();
while(i.hasNext())
foo(i.next());
}
这些经过包装后线程安全容器,都是基于synchronized这个同步关键字实现的,所以也被称为同步容器。Java提供的同步容器还有Vector、Stack和Hashtable,这三个容器不是基于包装类实现的,但同样是基于 synchronized实现的,对这三个容器的遍历,同样要加锁保证互斥。
并发容器:
Java1.5及之后版本提供了性能更高的容器,我们一般称为并发容器。
并发容器?
并发容器虽然数量非常多,但依然是前面我们提到的四大类:List、Map、Set和Queue,下面的并发容器关系图,基本上把我们经常用的容器都覆盖到了。
(一)List?
(二)Map
。。。。
JUC提供的原子类?
类别一:原子化的基本数据类型?
getAndIncrement() // 原子化 i++
getAndDecrement() // 原子化 i--
incrementAndGet() // 原子化 ++i
decrementAndGet() // 原子化 --i
//当前值+=delta,返回+=前的值
getAndAdd(delta)
//当前值+=delta,返回+=后的值
addAndGet(delta)
//CAS操作,返回是否成功
compareAndSet(expect,update)
// 以下四个方法
// 新值可以通过传入func函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)
类别二: 原子化的对象引用类型?
AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:
boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp)
AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法签名如下
boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
分类三:原子化数组?
分类四:原子化对象属性更新器?
相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:
public static <U> AtomicXXXFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName)
注意,对象属性必须是volatile类型的,只有这样才能保证可见性;如果对象属性不是volatile 类型的,newUpdater()方法会抛出IllegalArgumentException这个运行时异常。
boolean compareAndSet(T obj,
int expect,
int update)
分类五:原子化的累加器?
累加器的例子?
public class Test {
long count = 0;
void add10K() {
int idx = 0;
while(idx++ < 10000){
count += 1;
}
}
}
原子性问题的其他解法?
利用原子类解决累加器问题?
如下,两处简单的改动就能使add10K()方法变成线程安全的。
public class Test {
// ① long型变量count替换为了原子类AtomicLong
AtomicLong count = new AtomicLong(0);
void add10K() {
int idx = 0;
while(idx++ < 10000) {
// ② count+=1替换成了count.getAndIncrement()
count.getAndIncrement();
}
}
}
无锁方案的实现原理?
通过CAS指令的模拟代码来理解CAS的工作原理?
class SimulatedCAS{
int count;
synchronized int cas(int expect, int newValue){
// 读目前 count 的值
int curValue = count;
// 比较目前 count 值是否 == 期望值
if(curValue == expect){
// 如果是,则更新 count 的值
count = newValue;
}
// 返回写⼊前的值
return curValue;
}
}
累加器中的cas语义?
实现线程安全的累加器-CAS+自旋方案?
class SimulatedCAS{
volatile int count;
// 实现count+=1
addOne() {
do {
newValue = count+1; //①
}while(count != cas(count,newValue) //②
}
// 模拟实现CAS,仅用来帮助理解
synchronized int cas(int expect, int newValue){
//读目前count的值
int curValue = count;
//比较目前count值是否==期望值
if(curValue == expect){
//如果是,则更新count的值
count = newValue;
}
// 返回写当前的值
return curValue;
}
}
CAS无锁方案的优缺点?
什么情况下关注ABA问题?
Java 如何实现原子化的count += 1?
Java 是如何使用 CAS 来实现原子化的count += 1的?
-- 在Java1.8中,getAndIncrement() 方法会转调 unsafe.getAndAddLong()方法;
-- 这里 this 和valueOffset 两个参数可以唯一确定共享变量的内存地址。
final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
unsafe.getAndAddLong()方法的源码如下
-- 该方法首先会在内存中读取共享变量的值,之后循环调用 compareAndSwapLong() 方法来尝试设置共享变量的值,直到成功为止。
-- compareAndSwapLong() 是一个 native 方法,只有当内存中共享变量的值等于 expected 时,才会将共享变量的值更新为 x,并且返回 true;否则返回fasle。
-- compareAndSwapLong 的语义和CAS 指令的语义的差别仅仅是返回值不同而已。
public final long getAndAddLong(Object o, long offset, long delta){
long v;
do {
// 读取内存中的值
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
}
// 原子性地将变量更新为x的条件是内存中的值等于 expected
// 更新成功则返回 true
native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
CAS应用的抽象代码逻辑?
do {
// 获取当前值
oldV = xxxx;
// 根据当前值计算新值
newV = ...oldV...
}while(!compareAndSet(oldV,newV);
思考题:隐藏的while(true)问题?
下面的示例代码是合理库存的原子化实现,仅实现了设置库存上限setUpper()方法,你觉得setUpper()方法的实现是否正确呢?
public class SafeWM {
class WMRange{
final int upper;
final int lower;
WMRange(int upper,int lower){
// 省略构造函数实现
}
}
final AtomicReference<WMRange>
rf = new AtomicReference<>(
new WMRange(0,0)
);
// 设置库存上限
void setUpper(int v){
WMRange nr;
WMRange or = rf.get();
do{
// 检查参数合法性
if(v < or.lower){
throw new IllegalArgumentException();
}
nr = new
WMRange(v, or.lower);
}while(!rf.compareAndSet(or, nr));
}
}
解题:
看上去 while(!rf.compareAndSet(or, nr))是有终止条件的,而且跑单线程测试一直都没有问题。实际上却存在严重的并发问题,问题就出在对or的赋值在while循环之外,这样每次循环or的值都不会发生变化,所以一旦有一次循环rf.compareAndSet(or,nr)的值等于false,那之后无论循环多少次,都会等于false。也就是说在特定场景下,变成了while(true)问题。既然找到了原因,修改就很简单了,只要把对or的赋值移到while循环之内就可以了,修改后的代码如下所示:
public class SafeWM {
class WMRange{
final int upper;
final int lower;
WMRange(int upper,int lower){
//省略构造函数实现
}
}
final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0,0));
// 设置库存上限
void setUpper(int v){
WMRange nr;
WMRange or;
//原代码在这⾥
//WMRange or=rf.get();
do{
//移动到此处
//每个回合都需要重新获取旧值
or = rf.get();
// 检查参数合法性
if(v < or.lower){
throw new IllegalArgumentException();
}
nr = new WMRange(v, or.lower);
}while(!rf.compareAndSet(or, nr));
}
}
为什么使用线程池?
一般意义上的池化资源?
class XXXPool{
// 获取池化资源
XXX acquire(){
}
// 释放池化资源
void release(XXX x){
}
}
为什么线程池没有采用一般意义上池化资源的设计方法呢?
如果线程池采用一般意义上池化资源的设计方法,应该是下面示例代码这样。你可以来思考一下,假设我们获取到一个空闲线程T1,然后该如何使用T1呢?你期望的可能是这样:通过调用T1的execute()方法,传入一个Runnable对象来执行具体业务逻辑,就像通过构造函数Thread(Runnable target)创建线程一样。可惜的是,你翻遍Thread对象的所有方法,都不存在类似execute(Runnable target)这样的公共方法。
//采一般般意义上池化资源的设计方法
class ThreadPool{
// 获取空闲线程
Thread acquire() {
}
// 释放线程
void release(Thread t){
}
}
//期望的使用
ThreadPool pool;
Thread T1=pool.acquire();
//传入Runnable对象
T1.execute(()->{
//具体业务逻辑
......
});
所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法。
线程池如何设计呢?
通过创建简单的线程池MyThreadPool,理解线程池的工作原理?
//简化的线程池,仅用来说明工作原理
class MyThreadPool{
//利用阻塞队列实现生产者-消费者模式
BlockingQueue<Runnable> workQueue;
//保存内部工作线程
List<WorkerThread> threads = new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize,BlockingQueue<Runnable> workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx=0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run() {
//循环取任务并执行
while(true){ ①
Runnable task = workQueue.take();
task.run();
}
}
}
}
/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("hello");
});
如何使用Java中的线程池?
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
线程池的参数介绍?
使用线程池要注意些什么?
try {
//业务逻辑
} catch (RuntimeException x) {
//按需处理
} catch (Throwable x) {
//按需处理
}
课后思考:
使用线程池,默认情况下创建的线程名字都类似pool-1-thread-2这样,没有业务含义。而很多情况下为了便于诊断问题,都需要给线程赋予一个有意义的名字,那你知道有哪些办法可以给线程池里的线程指定名字吗?
问题:
在上一篇文章介绍了如何创建正确的线程池,那创建完线程池,我们该如何使用呢?
ThreadPoolExecutor提交和获取任务结果?
// 提交Runnable任务
Future<?> submit(Runnable task);
// 提交Callable任务
<T> Future<T> submit(Callable<T> task);
// 提交Runnable任务及结果引⽤
<T> Future<T> submit(Runnable task, T result);
Future接口的5个方法?
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout,TimeUnit unit);
3个submit()方法之间的区别在于方法参数不同,分别介绍?
方法③submit(Runnable task, T result)的用法?
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建Result对象r
Result r = new Result();
r.setAAA(a);
// 提交任务
Future<Result> future = executor.submit(new Task(r), r);
Result fr = future.get();
// 下面等式成立
fr === r;
fr.getAAA() === a;
fr.getXXX() === x
class Task implements Runnable{
Result r;
//通过构造函数传入result
Task(Result r){
this.r = r;
}
void run() {
//可以操作result
a = r.getAAA();
r.setXXX(x);
}
}
介绍FutureTask工具类?
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
如何使用FutureTask?
示例代码①: 将FutureTask对象提交给ThreadPoolExecutor去执行。
// 创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
示例代码②:FutureTask对象直接被Thread执行的示例代码如下所示。
可以看出:利用FutureTask对象可以很容易获取子线程的执行结果。
// 创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();
实现最优的“烧水泡茶”程序?
// 创建任务T2的FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());
// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1:洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1:烧开水...");
TimeUnit.SECONDS.sleep(15);
String tf = ft2.get(); // ★ 获取T2线程的茶叶
System.out.println("T1:拿到茶叶:"+tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
}
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String>{
@Override
String call() throws Exception{
System.out.println("T2:洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2:洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2:拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return "龙井";
}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
总结
课后思考
不久前听说小明要做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里。核心示例代码如下所示,由于是串行的,所以性能很慢,你来试着优化一下吧。
// 向电商S1询价,并保存
r1 = getPriceByS1();
save(r1);
// 向电商S2询价,并保存
r2 = getPriceByS2();
// 向电商S3询价,并保存
r3 = getPriceByS3();
异步代码?
例子:
如下代码,是串行的,为了提升性能,需要并行化,那具体实施起来该怎么做呢?
//以下两个方法都是耗时操作
doBizA();
doBizB()
实现:
如下代码,创建两个子线程去执行。如下并行方案,主线程无需等待doBizA()和doBizB()的执行结果,即doBizA()和doBizB()两个操作已经被异步化。
new Thread(()->doBizA())
.start();
new Thread(()->doBizB())
.start();
异步化?
异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。看到这里,相信你应该就能理解异步编程最近几年为什么会大火了,因为优化性能是互联网大厂的一个核心需求啊。Java1.8提供了CompletableFuture来支持异步编程,CompletableFuture有可能是你见过的最复杂的工具类了,不过功能也着实让人感到震撼。
CompletableFuture的核心优势?
1)无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
2)语义更清晰。
例如下代码中f3=f1.thenCombine(f2,()->{})能够清晰地表述“任务3要等待任务1和任务2都完成后才能开始”;
3)代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
使用CompletableFuture重新实现前面曾提及的烧水泡茶程序?
首先还是需要先完成分工方案,在下面的程序中,我们分了3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。这个分工如下图所示。
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
System.out.println("T1:洗水壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开水...");
sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "龙井";
});
//任务3:任务1和任务2完成后执⾏:泡茶
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任务3执行结果
System.out.println(f3.join());
void sleep(int t,TimeUnit u){
try {
u.sleep(t);
}catch(InterruptedException e){}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
介绍CompletableFuture的使用
如何创建CompletableFuture对象?
创建CompletableFuture对象主要靠下面代码中展示的这4个静态方法:
//使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
前两个方法:
-- 上面例子中使用了runAsync(Runnable runnable)和supplyAsync(Supplier supplier)
-- 区别是:Runnable接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的。
异步操作需要关注的两个问题?
如何理解CompletionStage接口?
CompletionStage接口如何描述串行关系、AND聚合关系、OR聚合关系以及异常处理?
CompletionStage接口--描述串行关系?
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
thenApply()方法是如何使用的?
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
()->"Hello World") //①
.thenApply(s->s +"QQ") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//输出结果
HELLO WORLD QQ
CompletionStage接口--描述AND汇聚关系?
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
CompletionStage接口--描述OR汇聚关系?
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
如何使用applyToEither()方法来描述一个OR汇聚关系?
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s);
System.out.println(f3.join());
CompletionStage接口--异常处理?
虽然上面我们提到的fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行7/0就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
CompletableFuture<Integer> f0 = CompletableFuture.
.supplyAsync(()->(7/0))
.thenApply(r->r*10);
System.out.println(f0.join());
CompletionStage接口给我们提供的方案非常简单,比try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
如何使用exceptionally()方法来处理异常?
CompletableFuture<Integer> f0 = CompletableFuture
.supplyAsync(()->7/0))
.thenApply(r->r*10)
.exceptionally(e->0);
System.out.println(f0.join())
总结
思考题
问题:
创建采购订单的时候,需要校验一些规则,例如最大金额是和采购员级别相关的。有同学利用CompletableFuture实现了这个校验的功能,逻辑很简单,首先是从数据库中把相关规则查出来,然后执行规则校验。你觉得他的实现是否有问题呢?
//采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{
//在数据库中查询规则
return findRuleByJdbc();
}).thenApply(r -> {
//规则校验
return check(po, r);
});
Boolean isOk = cf.join();
这段代码的问题,例如没有异常处理、逻辑不严谨等等,不过我更想让你关注的是:findRuleByJdbc()这个方法隐藏着一个阻塞式I/O,这意味着会阻塞调用线程。默认情况下所有的CompletableFuture共享一个ForkJoinPool,当有阻塞式I/O时,可能导致所有的ForkJoinPool线程都阻塞,进而影响整个系统的性能。
//采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{
//在数据库中查询规则
return findRuleByJdbc();
}).thenApply(r ->{
//规则校验
return check(po,r);
});
Boolean isOk = cf.join();
如何优化一个询价应用的核心代码?
如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用Future的get()方法获取询价结果,之后将询价结果保存在数据库中。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(()->getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 = executor.submit(()->getPriceByS2());
// 异步向电商S3询价
Future<Integer> f3 = executor.submit(()->getPriceByS3());
// 获取电商S1报价并异步保存
executor.execute(()->save(f1.get()));
// 获取电商S2报价并异步保存
executor.execute(()->save(f2.get())
// 获取电商S3报价并异步保存
executor.execute(()->save(f3.get())
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。这点小瑕疵你该如何解决呢?
// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
//电商S1报价异步进入阻塞队列
executor.execute(()->bq.put(f1.get()));
//电商S2报价异步进入阻塞队列
executor.execute(()->bq.put(f2.get()));
//电商S3报价异步进入阻塞队列
executor.execute(()->bq.put(f3.get()));
//异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
利用CompletionService实现询价系统?
如何创建CompletionService呢?
CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
1. ExecutorCompletionService(Executor executor);
2. ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>>
completionQueue)。
下面的示例代码完整地展示了如何利用CompletionService来实现高性能的询价系统。
-- 其中,没有指定completionQueue,因此默认使用无界的LinkedBlockingQueue。
-- 之后通过CompletionService接口提供的submit()方法提交了三个询价操作,这三个询价操作将会被CompletionService异步执行。
-- 最后,我们通过CompletionService接口提供的take()方法获取一个Future对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的Future对象),调用Future对象的get()方法就能返回询价操作的执行结果了。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
CompletionService接口说明?
介绍一下CompletionService接口提供的方法,CompletionService接口提供的方法有5个,这5个方法的方法签名如下所示。
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
其中,submit()相关的方法有两个。
① 一个方法参数是Callable task,前面利用CompletionService实现询价系统的示例代码中,我们提交任务就是用的它。
② 另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于ThreadPoolExecutor的 Future submit(Runnable task,T result) ,不再赘述。
利用CompletionService实现Dubbo中的Forking Cluster?
geocoder(addr) {
//并行执行以下3个查询服务,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
//只要r1,r2,r3有一个返回,则返回
return r1|r2|r3;
}
具体是如何实现Forking这种集群模式?
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存Future对象
List<Future<Integer>> futures = new ArrayList<>(3);
//提交异步任务,并保存future到futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
//简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
//取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
总结
思考题:
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
// 并计算最低报价
AtomicReference<Integer> m =
new AtomicReference<>(Integer.MAX_VALUE);
for (int i=0; i<3; i++) {
executor.execute(()->{
Integer r = null;
try {
r = cs.take().get();
} catch (Exception e) {}
save(r);
m.set(Integer.min(m.get(), r));
});
}
return m;
并发编程的任务角度和协作细节角度?
分治任务?
分治任务模型?
Fork/Join的介绍?
Fork/Join的使用?
static void main(String[] args){
//创建分治任务线程池
ForkJoinPool fjp = newForkJoinPool(4);
//创建分治任务
Fibonacci fib = new Fibonacci(30);
//启动分治任务
Integer result = fjp.invoke(fib);
//输出结果
System.out.println(result);
}
//递归任务
static class Fibonacci extends RecursiveTask<Integer>{
final int n;
Fibonacci(int n){this.n = n;}
protected Integer compute(){
if(n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
//创建子任务
f1.fork();
Fibonacci f2 =new Fibonacci(n - 2);
//等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}
ForkJoinPool工作原理?
模拟MapReduce统计单词数量?
static void main(String[] args){
String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
//创建ForkJoin线程池
ForkJoinPool fjp =
new ForkJoinPool(3);
//创建任务
MR mr = new MR(
fc, 0, fc.length);
//启动任务
Map<String, Long> result =
fjp.invoke(mr);
//输出结果
result.forEach((k, v)->
System.out.println(k+":"+v));
}
//MR模拟类
static class MR extends
RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;
//构造函数
MR(String[] fc, int fr, int to){
this.fc = fc;
this.start = fr;
this.end = to;
}
@Override protected
Map<String, Long> compute(){
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start+end)/2;
MR mr1 = new MR(
fc, start, mid);
mr1.fork();
MR mr2 = new MR(
fc, mid, end);
//计算⼦任务,并返回合并的结果
return merge(mr2.compute(),
mr1.join());
}
}
//合并结果
private Map<String, Long> merge(
Map<String, Long> r1,
Map<String, Long> r2) {
Map<String, Long> result =
new HashMap<>();
result.putAll(r1);
//合并结果
r2.forEach((k, v) -> {
Long c = result.get(k);
if (c != null)
result.put(k, c+v);
else
result.put(k, v);
});
return result;
}
//统计单词数量
private Map<String, Long>
calc(String line) {
Map<String, Long> result =
new HashMap<>();
//分割单词
String [] words =
line.split("\\s+");
//统计单词数量
for (String w : words) {
Long v = result.get(w);
if (v != null)
result.put(w, v+1);
else
result.put(w, 1L);
}
return result;
}
总结
Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小
的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的
MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。
Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工
作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream
API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享
一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密
集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计
算而拖慢整个系统的性能。所以建议用不同的ForkJoinPool执行不同类型的计算任务。
如果你对ForkJoinPool详细的实现细节感兴趣,也可以参考Doug Lea的论文。http://gee.cs.oswego.edu/dl/papers/fj.pdf
思考题:
对于一个CPU密集型计算程序,在单核CPU上,使用Fork/Join并行计算框架是否能够提高性能呢?
异步刷盘方式
一个自实现的日志组件,写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,往往采用异步刷盘的方式。
class Logger {
//任务队列
final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
//flush批量
static final int batchSize = 500;
//只需要⼀个线程写⽇志
ExecutorService es = Executors.newFixedThreadPool(1);
//启动写⽇志线程
void start() {
File file = File.createTempFile("foo", ".log");
final FileWriter writer = new FileWriter(file);
this.es.execute(() -> {
try {
//未刷盘⽇志数量
int curIdx = 0;
long preFT = System.currentTimeMillis();
while (true) {
LogMsg log = bq.poll(5, TimeUnit.SECONDS);
//写⽇志
if (log != null) {
writer.write(log.toString());
++curIdx;
}
//如果不存在未刷盘数据,则⽆需刷盘
if (curIdx <= 0) {
continue;
}
//根据规则刷盘
if (log != null && log.level == LEVEL.ERROR || curIdx == batchSize ||
System.currentTimeMillis() - preFT > 5000) {
writer.flush();
curIdx = 0;
preFT = System.currentTimeMillis();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
//写INFO级别⽇志
void info(String msg) {
bq.put(new LogMsg(LEVEL.INFO, msg));
}
//写ERROR级别⽇志
void error(String msg) {
bq.put(new LogMsg(LEVEL.ERROR, msg));
}
}
//⽇志级别
enum LEVEL {
INFO, ERROR
}
class LogMsg {
LEVEL level;
String msg;
//省略构造函数实现
LogMsg(LEVEL lvl, String msg) {
}
//省略toString()实现
String toString() {
}
}