[关闭]
@boothsun 2018-03-11T03:54:18.000000Z 字数 8220 阅读 1306

Executor框架学习笔记

Java多线程


《Java并发编程的艺术》

线程池的实现原理

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?

从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这种两级调度模型的示意图如下:

image.png-136.8kB

从上图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

为什么需要Executor框架

线程是一个操作系统概念。操作系统负责这个线程的创建、挂起、运行、阻塞和终结操作。而操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。

另一方面,大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的。这种技术背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程、切换线程状态、销毁线程这些操作所占用,用于业务请求处理的资源反而减少了。所以最理想的处理方式是,将处理请求的线程数量控制在一个范围,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。

另外,一些操作系统是有最大线程数量限制的。当运行的线程数量逼近这个值的时候,操作系统会变得不稳定。这也是我们要限制线程数量的原因。

网上看到的一句话: T1 表示线程创建时间 T2表示线程处理业务时间 T3表示线程销毁时间。如果T1+T3 远大于 T2,就可以考虑使用

还有其他原因:就是开发过程中,处于响应速度等其他性能考虑,我们经常将非核心且处理速度比较慢的流程,异步话的执行。出以这种需求,我们经常需要使用到线程。Executor就是我们这方面的好帮手。

Executor框架结构

image.png-174.1kB

Executor框架主要由3大部分组成:

Executor接口

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

Executor 接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法,它没有直接实现类只有另一个重要的子接口ExecutorService。它最重要的作用就是将任务的提交和任务的执行分离开来。

ExecutorService接口

  1. //继承自Executor接口
  2. public interface ExecutorService extends Executor {
  3. /**
  4. * 关闭方法,调用后执行之前提交的任务,不再接受新的任务
  5. */
  6. void shutdown();
  7. /**
  8. * 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表
  9. */
  10. List<Runnable> shutdownNow();
  11. /**
  12. * 判断执行器是否已经关闭
  13. */
  14. boolean isShutdown();
  15. /**
  16. * 关闭后所有任务是否都已完成
  17. */
  18. boolean isTerminated();
  19. /**
  20. * 中断
  21. */
  22. boolean awaitTermination(long timeout, TimeUnit unit)
  23. throws InterruptedException;
  24. /**
  25. * 提交一个Callable任务
  26. */
  27. <T> Future<T> submit(Callable<T> task);
  28. /**
  29. * 提交一个Runable任务,result要返回的结果
  30. */
  31. <T> Future<T> submit(Runnable task, T result);
  32. /**
  33. * 提交一个任务
  34. */
  35. Future<?> submit(Runnable task);
  36. /**
  37. * 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表
  38. */
  39. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  40. throws InterruptedException;
  41. /**
  42. * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
  43. */
  44. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  45. long timeout, TimeUnit unit)
  46. throws InterruptedException;
  47. /**
  48. * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
  49. */
  50. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  51. throws InterruptedException, ExecutionException;
  52. /**
  53. * 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
  54. */
  55. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  56. long timeout, TimeUnit unit)
  57. throws InterruptedException, ExecutionException, TimeoutException;
  58. }

ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程

ExecutorService的生命周期粗粒度可以分为三个状态:运行、关闭(shut down)、终止(terminated)。当ExecutorService具体类创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交的任务,当所有已经提交了的任务运行执行结束后,便到达终止状态。

AbstractExecutorService

实现了ExecutorService接口,提供了部分方法的默认实现。

ThreadPoolExecutor

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的具体实现类,主要由下列4个组件构成。

  1. ThreadPoolExecutor executorPoolService = new ThreadPoolExecutor(2,30,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(20));
  2. for(int i = 0 ; i < 20 ; i++) {
  3. int count = i;
  4. executorPoolService.execute(() -> {
  5. System.out.println(count);
  6. });
  7. }
  8. executorPoolService.shutdown();
  9. }

Executors

Executors是工具类,它提供了一系列工厂方法用于创建线程池,返回的线程池都是实现了ExecutorService的子类对象。

常见的创建ThreadPoolExecutor线程池的方法:

  1. // 创建固定数目线程的线程池。
  2. public static ExecutorService newFixedThreadPool(int nThreads) ;
  3. /**
  4. * 创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,
  5. * 则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
  6. */
  7. public static ExecutorService newCachedThreadPool();
  8. // 创建一个单线程化的Executor。
  9. public static ExecutorService newSingleThreadExecutor();
  10. //创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
  11. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
方法 解释
newFixedThreadPool() 适用于为了满足资源管理的需求,而需要实现当前线程数量的应用场景,它适用于负载比较重的服务器
newSingleThreadExecutor() 适用于需要保证顺序地执行各个任务;并且在任意时间节点,不会有多个线程时获得的应用场景
newCachedThreadPool() CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

优秀的网文:

hedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。(该段话摘自《Thinking in Java》第四版)

常见的创建ScheduledThreadPoolExecutor线程池的方法:

  1. // 只包含一个线程的ScheduledExecutorService
  2. public static ScheduledExecutorService newSingleThreadScheduledExecutor() ;
  3. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory);
  4. // 包含若干个线程的ScheduledThreadPoolExecutor
  5. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
  6. public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) ;
方法 解析
newScheduledThreadPool(int corePoolSize) ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。下面是Executors提供的,创建单个线程的SingleThreadScheduledExecutor的API。
newSingleThreadScheduledExecutor() SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景

ScheduledThreadPoolExecutor和ScheduledExecutorService

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

  1. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(20);
  2. System.out.println("startTime:" + System.currentTimeMillis());
  3. scheduledExecutorService.schedule(() -> {
  4. System.out.println("我等了2秒钟");
  5. System.out.println("end:" + System.currentTimeMillis());
  6. }, 2000,TimeUnit.MILLISECONDS);
  7. Thread.sleep(2*2000L);

线程池的使用

线程池的创建

我们可以通过ThreadPoolExecutor来创建一个线程池:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

创建一个线程池时需要输入几个参数,如下:

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()submit()方法。

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute()方法输入的任务是一个Runnable类的实例。

  1. threadsPool.execute(new Runnable() {
  2. @Override
  3. public void run() {
  4. // TODO Auto-generated method stub
  5. }
  6. });

submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注