@zsh-o
2018-11-05T07:53:33.000000Z
字数 18417
阅读 1351
Java
首先给出一个同步问题的定义
这里就要提到python的动态类型的tuple的好处,可以实现统一形式的多输入和多输出
接下来从最简单的开始一步一步进行优化
首先从最基本的顺序结构执行,要保证结果的正确性,需要按照上述有向图的一种拓扑排序顺序来组织代码
顺序结构我们只能按照其一种拓扑排序的方法组织其运行,这里按照[A,B,C,D,E,F]顺序运行即可
package com.zsh_o.future;/*** 顺序结构执行* */public class SequentialBase {public static void main(String[] args) {SequentialBase app = new SequentialBase();try {app.run();} catch (InterruptedException e) {e.printStackTrace();}}void run() throws InterruptedException {long startTime = System.currentTimeMillis();int a = getA();int b = getB(a);int c = getC();int d = getD();int e = getE(c, d);int f = getF(b, e);long endTime = System.currentTimeMillis();System.out.printf("Final Result: %d\n", f);System.out.printf("Total Time: %d\n", endTime - startTime);}/*** Define Functions A -> F* */int getA() throws InterruptedException {System.out.println("A: Running");Thread.sleep(1000);System.out.println("A: Returned");return 1;}int getB(int a) throws InterruptedException {System.out.println("B: Running");Thread.sleep(1000);System.out.println("B: Returned");return a + 1;}int getC() throws InterruptedException {System.out.println("C: Running");Thread.sleep(1000);System.out.println("C: Returned");return 10;}int getD() throws InterruptedException {System.out.println("D: Running");Thread.sleep(1000);System.out.println("D: Returned");return 20;}int getE(int c, int d) throws InterruptedException {System.out.println("E: Running");Thread.sleep(1000);System.out.println("E: Returned");return c + d;}int getF(int b, int e) throws InterruptedException {System.out.println("F: Running");Thread.sleep(1000);System.out.println("F: Returned");return b * e;}}
执行结果:
A: RunningA: ReturnedB: RunningB: ReturnedC: RunningC: ReturnedD: RunningD: ReturnedE: RunningE: ReturnedF: RunningF: ReturnedFinal Result: 60Total Time: 6003
可以看到执行效果还是非常不错的,加上输入输出也只多了3ms,再复杂的代码都是由最基础的代码一步一步按照场景优化来的,所以先来优化这个顺序结构
先从小功能开始,记录时间的代码太长了,如果要计时的太多会产生很多的计时变量,把其封装一下:
package com.zsh_o.util;/*** 用以记录程序运行时间* */public class CountTimer {private long startTime;private long endTime;private long time;public CountTimer() {startTime = 0;endTime = 0;time = 0;}public void start() {startTime = System.currentTimeMillis();}public void end() {endTime = System.currentTimeMillis();time = endTime - startTime;}public long getTime() {return time;}}
第二个是,为了查看效果我们强行家了sleep一秒,但sleep是要catch异常的,所以每sleep一次都要处理下异常,在这个里面是完全没有必要的,我们单拿出来并处理下异常
package com.zsh_o.util;public class Util {public static void sleep(long ms) {try {Thread.sleep(ms);} catch (InterruptedException e) {e.printStackTrace();}}}
接下来我们考虑如何对代码进行复用,首先发现可以抽象出函数的调用,这样用户只需要关心函数体的内容就可以了,所以抽象出函数的调用,增加Callable接口
package com.zsh_o.future;public interface Callable<T> {T call();}
这个地方可以看到我们定义的接口没有传递参数,是由于我们不知道上层用户想要传递几个参数,也不知道每个参数是什么类型,所以最好的办法就是不写参数。。。
python里面传递参数相当于直接传递tuple和dict,然后在函数里面检验参数正确性,但对于Java这种静态编译语言这种方法太影响性能,直接用Object[]在函数里面类型转换,这样对函数参数的约束太弱,很容易崩溃。Java也用泛型实现Tuple,但无法保证长度,所以这个地方直接不加参数,用闭包实现功能。
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;import java.util.ArrayList;public class SequentialCase1 {int a, b, c, d, e, f;public static void main(String[] args) {SequentialCase1 app = new SequentialCase1();app.run();}void run() {CountTimer timer = new CountTimer();timer.start();ArrayList<Callable> array = new ArrayList<>();array.add(() -> {System.out.println("A: Running"); Util.sleep(1000); a = 1; System.out.println("A: Returned"); return a;});array.add(() -> {System.out.println("B: Running"); Util.sleep(1000); b = a + 1; System.out.println("B: Returned"); return b;});array.add(() -> {System.out.println("C: Running"); Util.sleep(1000); c = 10; System.out.println("C: Returned"); return c;});array.add(() -> {System.out.println("D: Running"); Util.sleep(1000); d = 20; System.out.println("D: Returned"); return d;});array.add(() -> {System.out.println("E: Running"); Util.sleep(1000); e = c + d; System.out.println("E: Returned"); return e;});array.add(() -> {System.out.println("F: Running"); Util.sleep(1000); f = b * e; System.out.println("F: Returned"); return f;});for(var e : array) {e.call();}System.out.printf("Final Result: %d\n", f);timer.end();System.out.printf("Total Time: %d\n", timer.getTime());}}
结果如下
A: RunningA: ReturnedB: RunningB: ReturnedC: RunningC: ReturnedD: RunningD: ReturnedE: RunningE: ReturnedF: RunningF: ReturnedFinal Result: 60Total Time: 6046
可以看到ArrayList还是蛮耗时的
接下来我们发现,为了查看方法的执行效果,输出了每个方法的运行状态,但该运行状态在每个方法上都是相似的,包括sleep一秒也是相似的,所以,上面两个部分相当于对想实现功能的结果的附加和扩展,所以这里我们用装饰器模式对其扩展附加功能,首先定义该装饰器
package com.zsh_o.future;/*** 采用装饰器对Call进行装饰,为Call增加附加功能* */public class CallDecorator<T> implements Callable<T> {Callable<T> callable;String name;public CallDecorator(String name, Callable<T> callable) {this.callable = callable;this.name = name;}@Overridepublic T call() {System.out.printf("%s: Running\n", name);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}T value = callable.call();System.out.printf("%s: Returned\n", name);return value;}}
这里有一个问题,为什么是装饰器而不是代理?代理强调的是对代理对象的访问和执行控制,而装饰器强调对行为的扩展,这个地方只是加了输出和暂停一秒,所以叫装饰器更为合适
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import java.util.ArrayList;public class SequentialCase2{int a, b, c, d, e, f;public static void main(String[] args) {SequentialCase2 app = new SequentialCase2();try {app.run();} catch (InterruptedException e1) {e1.printStackTrace();}}void run() throws InterruptedException {CountTimer timer = new CountTimer();timer.start();ArrayList<CallDecorator> array = new ArrayList<>();array.add(new CallDecorator("A", () -> {a = 1; return a;}));array.add(new CallDecorator("B", () -> {b = a + 1; return b;}));array.add(new CallDecorator("C", () -> {c = 10; return c;}));array.add(new CallDecorator("D", () -> {d = 20; return d;}));array.add(new CallDecorator("E", () -> {e = c + d; return e;}));array.add(new CallDecorator("F", () -> {f = b * e; return f;}));for(var e : array) {e.call();}timer.end();System.out.printf("Final Result: %d\n", f);System.out.printf("Total Time: %d\n", timer.getTime());}}
结果:
A: RunningA: ReturnedB: RunningB: ReturnedC: RunningC: ReturnedD: RunningD: ReturnedE: RunningE: ReturnedF: RunningF: ReturnedFinal Result: 60Total Time: 6055
我们用并发的方式优化该代码,该程序理论上最少的运行时间是3s,每一列的代码可以一起执行。在最一开始我们采用了6把锁来完成这个同步问题
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;import java.util.ArrayList;import java.util.concurrent.CountDownLatch;import java.util.concurrent.locks.ReentrantLock;/*** 多线程解决,可能会出现的问题是还没来得及加锁,其他的线程已经开始运行* */public class ThreadBase1 {private int a, b, c, d, e, f;private ReentrantLock lockA, lockB, lockC, lockD, lockE, lockF;public ThreadBase1() {this.lockA = new ReentrantLock();this.lockB = new ReentrantLock();this.lockC = new ReentrantLock();this.lockD = new ReentrantLock();this.lockE = new ReentrantLock();this.lockF = new ReentrantLock();}void run() throws InterruptedException {CountTimer timer = new CountTimer();timer.start();// 用CountDownLatch确保主线程在所有子线程执行完再执行CountDownLatch latch = new CountDownLatch(6);ArrayList<Thread> array = new ArrayList<>();array.add(new Thread(() -> {lockA.lock();System.out.println("A: Running");Util.sleep(1000);a = 1;System.out.println("A: Returned");lockA.unlock();latch.countDown();}));array.add(new Thread(() -> {lockB.lock();lockA.lock();System.out.println("B: Running");Util.sleep(1000);b = a + 1;System.out.println("B: Returned");lockA.unlock();lockB.unlock();latch.countDown();}));array.add(new Thread(() -> {lockC.lock();System.out.println("C: Running");Util.sleep(1000);c = 10;System.out.println("C: Returned");lockC.unlock();latch.countDown();}));array.add(new Thread(() -> {lockD.lock();System.out.println("D: Running");Util.sleep(1000);d = 20;System.out.println("D: Returned");lockD.unlock();latch.countDown();}));array.add(new Thread(() -> {lockE.lock();lockC.lock();lockD.lock();System.out.println("E: Running");Util.sleep(1000);e = c + d;System.out.println("E: Returned");lockD.unlock();lockC.unlock();lockE.unlock();latch.countDown();}));array.add(new Thread(() -> {lockF.lock();lockB.lock();lockE.lock();System.out.println("F: Running");Util.sleep(1000);f = b * e;System.out.println("F: Returned");lockE.unlock();lockB.unlock();lockF.unlock();latch.countDown();}));for(var e: array) {e.start();}latch.await();timer.end();System.out.printf("Final Result: %d\n", f);System.out.printf("Total Time: %d\n", timer.getTime());}public static void main(String[] args) {ThreadBase1 app = new ThreadBase1();try {app.run();} catch (InterruptedException e1) {e1.printStackTrace();}}}
结果:
D: RunningA: RunningC: RunningD: ReturnedC: ReturnedA: ReturnedE: RunningB: RunningE: ReturnedB: ReturnedF: RunningF: ReturnedFinal Result: 60Total Time: 3030
这个可能会出现的问题是,可能会出现还没来得及加锁后面的就运行了,则会出现错误结果,所以我们用CountDownLatch来修改该代码
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;import java.util.ArrayList;/*** 多线程解决* 用CountDownLatch完善* */public class ThreadCase1 {private int a, b, c, d, e, f;private CountDownLatch latchB, latchE, latchF;public ThreadCase1() {latchB = new CountDownLatch(1);latchE = new CountDownLatch(2);latchF = new CountDownLatch(2);}void run() throws InterruptedException {CountTimer timer = new CountTimer();timer.start();CountDownLatch latchMain = new CountDownLatch(1);ArrayList<Thread> array = new ArrayList<>();array.add(new Thread(() -> {System.out.println("A: Running");Util.sleep(1000);a = 1;System.out.println("A: Returned");latchB.countDown();}));array.add(new Thread(() -> {try {latchB.await();System.out.println("B: Running");Util.sleep(1000);b = a + 1;System.out.println("B: Returned");latchF.countDown();} catch (InterruptedException e1) {e1.printStackTrace();}}));array.add(new Thread(() -> {System.out.println("C: Running");Util.sleep(1000);c = 10;System.out.println("C: Returned");latchE.countDown();}));array.add(new Thread(() -> {System.out.println("D: Running");Util.sleep(1000);d = 20;System.out.println("D: Returned");latchE.countDown();}));array.add(new Thread(() -> {try {latchE.await();System.out.println("E: Running");Util.sleep(1000);e = c + d;System.out.println("E: Returned");latchF.countDown();} catch (InterruptedException e1) {e1.printStackTrace();}}));array.add(new Thread(() -> {try {latchF.await();System.out.println("F: Running");Util.sleep(1000);f = b * e;System.out.println("F: Returned");latchMain.countDown();} catch (InterruptedException e1) {e1.printStackTrace();}}));for(var e: array) {e.start();}latchMain.await();System.out.printf("Final Result: %d\n", f);timer.end();System.out.printf("Total Time: %d\n", timer.getTime());}public static void main(String[] args) {ThreadCase1 app = new ThreadCase1();try {app.run();} catch (InterruptedException e1) {e1.printStackTrace();}}}
执行结果与上面相同,可以看到CountownLatch非常好用,而且原理非常简单,那么我们来实现个吧
原理:await()阻断线程,并且MaxCount个线程执行countDown()之后才打开阻断,需要是不同的线程,需要注意的是countDown()函数不截断执行的线程,只截断执行await()的线程,如果想同时截断需要配合使用
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;import javax.swing.event.MouseInputListener;import java.util.concurrent.locks.ReentrantLock;public class CountDownLatch {private int count;final private Object gobal;private ReentrantLock lockCount;public CountDownLatch(int count) {this.count = count;lockCount = new ReentrantLock();gobal = new Object();}public void countDown() {lockCount.lock();if(count > 0)count--;else return;if(count == 0)synchronized (gobal) {gobal.notifyAll();}lockCount.unlock();}public void await() throws InterruptedException {synchronized (gobal) {if (count > 0)gobal.wait();}}}
这里用了java的Object的wait和noifyAll()函数,用以任意数量的线程等待和恢复
上面的问题是需要用户自己来考虑同步机制,如何把同步方法也封装到代码里面,让用户只关心代码逻辑即可,这个地方用的是Future模式,顾名思义,Future表示的是未来的一种意愿,表示未来会发生的事情,这个地方表示该实体在未来会得到返回值,调用future.get的表示期望得到给结果,如果结果还没有准备好则等待结果,例如上面的E会执行int e = futureC.get() + futureD.get()则E会等待C和D执行完之后继续执行。这个地方涉及到两个点:一,如何进入等待,二,如何从等待中恢复;这里的原则是等待过程不能占用计算资源,也就是不能用while一直判断的方法实现,那么实现方法就应该是事件驱动,进入等待释放计算资源,从等待恢复再重新可计算。
这个地方需要用代理模式,用代理形成逻辑,最后具体的执行由代理控制
首先定义futurable接口
package com.zsh_o.future;public interface Futurable<T> {T get();}
下面是Future抽象类
package com.zsh_o.future;public abstract class Future<T> implements Futurable<T>, Runnable {protected Callable<T> callable;protected boolean finished;public Future() {this.callable = null;this.finished = false;}public void register(Callable<T> callable) {this.callable = callable;}}
接下来是以线程的方式实现Future,当然也可以实现分布式的方式,核心思想相似
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;public class ThreadFuture<T> extends Future<T> {private CountDownLatch latch;private T value;private boolean finished;private final Object lock;public ThreadFuture() {latch = new CountDownLatch(1);value = null;finished = false;lock = new Object();}@Overridepublic T get() {try {if (callable == null)throw new Exception("Unregistered Callable");if (!finished){run();latch.await();}} catch (Exception e) {e.printStackTrace();}return value;}@Overridepublic void run() {new Thread(()->{synchronized (lock) {if (!finished) {value = callable.call();finished = true;latch.countDown();}}}).start();}public void reset() {synchronized (lock) {finished = false;}}public boolean getState() {return finished;}}
实现的功能是,用register注册函数体逻辑Callable,需要注意的是Callable中需要调用代理的future.get()方法,以形成执行逻辑,get()执行await进入等待,等待其他线程执行完run(),并且保证只执行一次
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;public class ThreadCase2 {private ThreadFuture<Integer> futureA, futureB, futureC, futureD, futureE, futureF;public ThreadCase2() {this.futureA = new ThreadFuture<>();this.futureB = new ThreadFuture<>();this.futureC = new ThreadFuture<>();this.futureD = new ThreadFuture<>();this.futureE = new ThreadFuture<>();this.futureF = new ThreadFuture<>();}void run() {CountTimer timer = new CountTimer();timer.start();futureA.register(()->{System.out.println("A: Runing");int a = 1;Util.sleep(1000);System.out.println("A: Over");return a;});futureB.register(()->{System.out.println("B: Runing");int b = futureA.get()+1;Util.sleep(1000);System.out.println("B: Over");return b;});futureC.register(()->{System.out.println("C: Runing");int c = 10;Util.sleep(1000);System.out.println("C: Over");return c;});futureD.register(()->{System.out.println("D: Runing");int d = 20;Util.sleep(1000);System.out.println("D: Over");return d;});futureE.register(()->{System.out.println("E: Runing");int e = futureC.get()+futureD.get();Util.sleep(1000);System.out.println("E: Over");return e;});futureF.register(()->{System.out.println("F: Runing");int f = futureB.get()*futureE.get();Util.sleep(1000);System.out.println("F: Over");return f;});futureA.run();futureB.run();futureC.run();futureD.run();futureE.run();futureF.run();System.out.printf("Final Result: %d\n", futureF.get());timer.end();System.out.printf("Total Time: %d\n", timer.getTime());}public static void main(String[] args) {ThreadCase2 app = new ThreadCase2();app.run();}}
结果:
A: RuningB: RuningC: RuningD: RuningE: RuningF: RuningC: OverD: OverA: OverE: OverB: OverF: OverFinal Result: 60Total Time: 3051
最后,如果并发的非常多可能会崩掉,所以我们加了一个限制线程执行状态最大值的方法
package com.zsh_o.future;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedDeque;public class ThreadPoolFuture<T> {private class Item<D> extends Future<D> {Thread thread;D value;CountDownLatch latch;final Object lock;boolean finished;public Item() {this.value = null;this.latch = new CountDownLatch(1);this.lock = new Object();this.finished = false;}@Overridepublic D get() {return value;}@Overridepublic void run() {if (!thread.isAlive()) {thread.start();}}}private int maxPool;private final Object lockObject;private ConcurrentHashMap<String, Item<T>> pool;private ConcurrentLinkedDeque<Item<T>> poolList;private int currentRuning;private CountDownLatch lathGobal;public ThreadPoolFuture(int maxPool) {this.maxPool = maxPool;this.pool = new ConcurrentHashMap<>();poolList = new ConcurrentLinkedDeque<>();this.lockObject = new Object();this.currentRuning = 0;}public void register(String name, Callable<T> callable) {Item<T> item = new Item<>();item.thread = new Thread(()->{synchronized (item.lock) {if (!item.finished) {synchronized (lockObject) {while (currentRuning >= maxPool) {try {lockObject.wait();} catch (InterruptedException e) {e.printStackTrace();}}currentRuning++;System.out.println(name + " Run: " + currentRuning);}item.value = callable.call();synchronized (lockObject) {currentRuning--;lockObject.notifyAll();}item.latch.countDown();lathGobal.countDown();item.finished = true;}}});pool.put(name, item);poolList.push(item);}public T get(String name) {if (!pool.containsKey(name)) {try {throw new Exception("Unregister Callable");} catch (Exception e) {e.printStackTrace();}}Item<T> item = pool.get(name);if (!item.finished) {try {synchronized (lockObject) {lockObject.notifyAll();currentRuning--;System.out.println(name + " Get: " + currentRuning);}item.run();item.latch.await();synchronized (lockObject) {while (currentRuning >= maxPool) {try {lockObject.wait();} catch (InterruptedException e) {e.printStackTrace();}}currentRuning++;System.out.println(name + " Get: " + currentRuning);}} catch (InterruptedException e) {e.printStackTrace();return null;}}return item.value;}public void start() {lathGobal = new CountDownLatch(poolList.size());for(var e: poolList) {e.run();}}public void await() throws InterruptedException {if (lathGobal == null)lathGobal = new CountDownLatch(poolList.size());lathGobal.await();}}
基本思路是,由于上面我们用消息的方法执行等待和恢复执行,所以进入等待和恢复执行对于我们来说是可见的,所以只需要监控当前运行的最大线程即可,只是执行状态的数量不包括阻塞和等待
package com.zsh_o.future;import com.zsh_o.util.CountTimer;import com.zsh_o.util.Util;/*** Future模式解决问题,限制最大运行线程数* */public class ThreadCase3 {private ThreadPoolFuture<Integer> pool;public ThreadCase3() {this.pool = new ThreadPoolFuture<>(2);}void run() throws Exception {CountTimer timer = new CountTimer();timer.start();pool.register("A", ()->{System.out.println("A: Running");int a = 1;Util.sleep(1000);System.out.println("A: Over");return a;});pool.register("B", ()->{System.out.println("B: Running");int b = pool.get("A") + 1;Util.sleep(1000);System.out.println("B: Over");return b;});pool.register("C", ()->{System.out.println("C: Running");int c = 10;Util.sleep(1000);System.out.println("C: Over");return c;});pool.register("D", ()->{System.out.println("D: Running");Util.sleep(1000);System.out.println("D: Over");return 20;});pool.register("E",()->{System.out.println("E: Running");Util.sleep(1000);int e = pool.get("C") + pool.get("D");System.out.println("E: Over");return e;});pool.register("F", ()->{System.out.println("F: Running");int f = pool.get("B") * pool.get("E");Util.sleep(1000);System.out.println("F: Over");return f;});pool.start();pool.await();System.out.printf("Final Result: %d\n", pool.get("F"));timer.end();System.out.printf("Total Time: %d\n", timer.getTime());}public static void main(String[] args) {ThreadCase3 app = new ThreadCase3();try {app.run();} catch (Exception e) {e.printStackTrace();}}}
结果:
maxCount = 2F Run: 1F: RunningB Get: 0A Run: 1A: RunningB Run: 2B: RunningA Get: 1C Run: 2C: RunningA: OverC: OverE Run: 2E: RunningD Run: 2D: RunningD Get: 1D: OverA Get: 2D Get: 2E: OverB: OverB Get: 1F: OverFinal Result: 60Total Time: 4072------------------------------maxCount = 1F Run: 1F: RunningB Get: 0A Run: 1A: RunningA: OverE Run: 1E: RunningC Get: 0B Run: 1B: RunningB: OverD Run: 1D: RunningD: OverB Get: 1E Get: 0C Run: 1C: RunningC: OverC Get: 1E: OverE Get: 1F: OverFinal Result: 60Total Time: 6098---------------------maxCount = 3F Run: 1F: RunningA Run: 2A: RunningB Run: 3B: RunningA Get: 2E Run: 3E: RunningB Get: 2D Run: 3D: RunningA: OverC Run: 3D: OverC Get: 2C: RunningA Get: 3C: OverC Get: 2E: OverB: OverB Get: 1F: OverFinal Result: 60Total Time: 3088-------------------------maxCount = 4F Run: 1F: RunningA Run: 2A: RunningB Run: 3B: RunningC Run: 4C: RunningA Get: 3B Get: 2E Run: 3E: RunningD Run: 4D: RunningA: OverC: OverD: OverC Get: 2A Get: 2C Get: 2E: OverB: OverB Get: 1F: OverFinal Result: 60Total Time: 3078