Executor多线程框架

前言

在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终奔溃。Java线程既是工作单元,也是执行单元。

new Thread()的缺点

  • 每次new Thread()耗费性能
  • 调用new Thread()创建的线程缺乏管理,被称之为野线程,而且可以无限制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪
  • 不利于扩展,比如定时执行、定期执行、线程中断

采用线程池的优点

  • 重用存在的线程,减少对象创建、消亡的开销,性能低
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞
  • 提供定时执行、定期执行、单线程、并发控制等功能

从JDK1.5开始,并发编程引入了一堆新的启动、调整和管理线程的API,把工作单元与执行单元机制分离开来,工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

Executor内部使用了线程池机制,它在java.util.concurrent包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java1.5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,借阅开销)外,还有关键的一点:有助于避免this逃逸问题。

Executor框架包括:线程池、Executor、Executors、ExecutorSeivice、CompletionService、Future、Callable等。

Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当Java线程终止后,这个操作系统线程也会被回收。操作系统会调用所有线程并将它们分配给可用的CPU。可以将此种模式分为两层,在上层,Java多线程程序通常把应用程序分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

从图中可以看出,该框架用来控制应用程序的上层调度(下层调度有操作系统内核控制,不受应用程序的控制)。

Executor框架的结构

任务

包括被执行任务需要实现的接口:Runnable接口和Callable接口

  • Executor执行Runnable任务

    一旦Runnable任务传递到execute()方法,该方法便会自动在一个线程上执行,但是Runnable任务没有返回值。

  • Callable接口

    Callable任务的call()方法只能通过ExecutorService的submit(Callable task)方法来执行,并且返回一个Future,表示等待任务完成的Future。

任务的执行

包括任务执行机制的核心接口Executor,以及继承Executor的ExecutorService接口。

Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor和ScheduledThreadPoolExecutor。

Executor框架的类与接口

  • Executor是一个接口,是Executor框架的基础,它将任务的提交与任务的执行分离
  • ThreadPoolExecutor是线程池的核心线程类,用来执行被提交的任务
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大
  • Future接口和它的实现类FutureTask类,代表异步计算的结果
  • Runnable和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行

Executor框架的使用

  • 主线程首先要创建实现Runnable接口或者实现Callable接口的任务对象
  • 然后可以把Runnable对象或者Callable对象直接交给ExecutorService执行
  • 最后,主线程可以执行Future.get()方法来等待任务执行完成,也可以执行Future.cancel()方法来取消此任务的执行

ThreadPoolExecutor

Executor框架最核心的类是ThreadPoolExecutor。

ThreadPoolExecutor的组件构成

  • corePool:核心线程池的大小
  • maximumPool:最大线程池的大小
  • BlockingQueue:用来暂时保存任务的工作队列
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或已经饱和时,execute()方法将要调用的Handler

Executor可以创建3种类型的ThreadPoolExecutor线程池。

  • FixedThreadPool

    创建固定长度的线程池,每次提交任务创建一个线程,直到达到线程池的最大数量,线程池的大小不再变化。

    1
    2
    3
    4
    5
    6
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
    • FixedThreadPool的corePoolSize和maxiumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads
    • 0L表示当线程池中的线程数量操作核心线程的数量时,多余的线程将被立即停止
    • 最后一个参数表示FixedThreadPool使用了无界队列LinkedBlockingQueue作为线程池的做工队列,由于是无界的,当线程池的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maximumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务
    • 由于LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池中

    FixedThreadPool执行过程如下

    • 如果当前工作中的线程数少于corePool的数量,就创建新的线程来执行任务
    • 当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue
    • 线程执行完任务后会从队列中取任务
  • SingleThreadExecutor

    SingleThreadExecutor是使用单个worker线程的Executor。

    1
    2
    3
    4
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }
    • SingleThreadExecutor的corePoolSize和maxiumPoolSize都被设置1,其他参数均与FixedThreadPool相同。
    • 由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行

    SingleThreadExecutor执行过程如下

    • 如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务
    • 当线程池中的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue
    • 线程执行完任务后会从队列中取任务
  • CachedThreadPool

    CachedThreadPool是一个“无限”容量的线程池,它会根据需要创建新线程执行任务,没有特定的corePool。

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
    • CachedThreadPool的corePoolSize被设置为0,即corePool为空,maximumPoolSize被设置为Integer.MAX_VALUE,即maximum是无界的。这里keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收
    • CachedThreadPool使用没有容量的SynchronousQueue作为主线程的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应删除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。

    CachedThreadPool执行过程如下

    • 首先执行SynchronousQueue.offer(Runnable task)。如果是当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空线程执行的poll操作配对成功,主线程把任务交给空闲线程,execute()方法成功,否则执行下面的步骤2
    • 当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务
    • 在创建新的线程任务以后,将会执行poll操作。当步骤2的线程执行完后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源