Executor, ExecutorService, Executors, ScheduledExecutorService和CompletionService

java.util.concurrent.Executor, java.util.concurrent.ExecutorService, java.util.concurrent.Executors 这三者均是 Java Executor 框架的一部分,使用Executor框架能够将任务的提交和任务的执行解耦,从单个线程的提交执行进化到通过线程池的形式进行管理。Java 1.5之后还提供了许多内置的线程池配置进一步简化线程池的创建和管理。


public interface Executor {
    void execute(Runnable command);


class ThreadPerTaskExecutor implements Executor{
    public void execute(Runnable r){
        new Thread(r).start();

class DirectExecutor implements Executor{
    public void execute(Runnable r){



public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
  1. submit为无返回值的runnable提供了返回值参数。
  2. invokeAll会阻塞直到所有的任务都完成或者超时或抛出exception,通过future.get()得到各个任务的运行结果或者exception信息。
  3. ==invokeAny当一个线程返回时,其他线程会被interrupt。==
  4. 调用shutdown/shutdownNow后,ExecutorService的状态变成SHUTDOWN,submit新任务会抛出RejectedExecutionException
  5. 优雅的关闭ExecutorService:
     catch(InterruptedException e){


public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    List<Callable<Long>> tasks = new LinkedList<>();
    tasks.add(new Callable<Long>() {
        public Long call() {
            log.info("start thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(5 * 1000);
            catch (Exception e) {
            log.info("finish thread " + Thread.currentThread().getName());
            return Thread.currentThread().getId();

    tasks.add(new Callable<Long>() {
        public Long call() {
            log.info("start thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(2 * 1000);
            catch (Exception e) {
            log.info("finish thread " + Thread.currentThread().getName());
            return Thread.currentThread().getId();

    try {
        log.info("Start tasks");
        Long ret = executorService.invokeAny(tasks);
        log.info("Finish tasks");
    } catch (Exception e) {

12:47:35.775 INFO  com.worksap.company.TempTest - Start tasks
12:47:35.835 INFO  com.worksap.company.TempTest - start thread pool-1-thread-1
12:47:35.839 INFO  com.worksap.company.TempTest - start thread pool-1-thread-2
12:47:37.840 INFO  com.worksap.company.TempTest - finish thread pool-1-thread-2
12:47:37.842 INFO  com.worksap.company.TempTest - Finish tasks
12:47:37.845 INFO  com.worksap.company.TempTest - finish thread pool-1-thread-1
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.worksap.company.TempTest$1.call(TempTest.java:21)
	at com.worksap.company.TempTest$1.call(TempTest.java:16)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)


public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    List<Callable<Long>> tasks = new LinkedList<>();
    tasks.add(new Callable<Long>() {
        public Long call() {
            log.info("start thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(5 * 1000);
            catch (Exception e) {
            log.info("finish thread " + Thread.currentThread().getName());
            return Thread.currentThread().getId();

    tasks.add(new Callable<Long>() {
        public Long call() {
            log.info("start thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(2 * 1000);
            catch (Exception e) {
            throw new RuntimeException("thread" + Thread.currentThread().getName() + " throws exception.");

    try {
        List<Future<Long>> futures = executorService.invokeAll(tasks);
        log.info("first future isDone " + futures.get(0).isDone());
        log.info("second future isDone " + futures.get(1).isDone());
        log.info("first future result " + futures.get(0).get());
        log.info("second future result " + futures.get(1).get());
    } catch (Exception e) {

    try {
        Thread.sleep(10 * 1000);
    } catch (Exception e) {


15:22:27.548 INFO  com.worksap.company.TempTest - start thread pool-1-thread-1
15:22:27.554 INFO  com.worksap.company.TempTest - start thread pool-1-thread-2
15:22:32.553 INFO  com.worksap.company.TempTest - finish thread pool-1-thread-1
15:22:32.555 INFO  com.worksap.company.TempTest - first future isDone true
15:22:32.555 INFO  com.worksap.company.TempTest - second future isDone true
15:22:32.555 INFO  com.worksap.company.TempTest - first future result 12
java.util.concurrent.ExecutionException: java.lang.RuntimeException: threadpool-1-thread-2 throws exception.
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.worksap.company.TempTest.main(TempTest.java:51)
Caused by: java.lang.RuntimeException: threadpool-1-thread-2 throws exception.
	at com.worksap.company.TempTest$2.call(TempTest.java:42)
	at com.worksap.company.TempTest$2.call(TempTest.java:32)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)



static ExecutorService	newCachedThreadPool();
static ExecutorService	newCachedThreadPool(ThreadFactory threadFactory);
static ExecutorService	newFixedThreadPool(int nThreads);
static ExecutorService	newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
static ExecutorService  newSingleThreadExecutor(ThreadFactory threadFactory);

static ScheduledExecutorService	newScheduledThreadPool(int corePoolSize);
static ScheduledExecutorService	newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);


public interface ThreadFactory{
    Thread newThread(Runnable r);


* The default thread factory
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
        if (t.isDaemon())
        if (t.getPriority() != Thread.NORM_PRIORITY)
        return t;


 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available, and uses the provided
 * ThreadFactory to create new threads when needed.
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
  1. 缓存型线程池,先使用线程池中能够重用的线程,如果没有再创建线程。
  2. 线程池中线程有timeout,如果timeout后线程仍未被使用,线程将被终止并移除线程池,默认的timeout时间为60s。
  3. 由于CachedThreadPool能够自行终止不用线程,因此适用于生存期很短的异步任务


 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue, using the provided
 * ThreadFactory to create new threads when needed.  At any point,
 * at most {@code nThreads} threads will be active processing
 * tasks.  If additional tasks are submitted when all threads are
 * active, they will wait in the queue until a thread is
 * available.  If any thread terminates due to a failure during
 * execution prior to shutdown, a new one will take its place if
 * needed to execute subsequent tasks.  The threads in the pool will
 * exist until it is explicitly {@link ExecutorService#shutdown
 * shutdown}.
 * @param nThreads the number of threads in the pool
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 * @throws IllegalArgumentException if {@code nThreads <= 0}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
  1. 线程池的大小固定为nThreads。
  2. 当线程池中没有可用的线程时,新提交的任务将在queue中等待。
  3. 线程的timeout时间被设为0,也就是永远不会timeout。


 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue, and uses the provided ThreadFactory to
 * create a new thread when needed. Unlike the otherwise
 * equivalent {@code newFixedThreadPool(1, threadFactory)} the
 * returned executor is guaranteed not to be reconfigurable to use
 * additional threads.
 * @param threadFactory the factory to use when creating new
 * threads
 * @return the newly created single-threaded Executor
 * @throws NullPointerException if threadFactory is null
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),


CachedThreadPool, FixedThreadPool和SingleThreadPool低层的实现都是ThreadPoolExecutor,只是参数不同。 关于ThreadPoolExecutor请见ThreadPoolExecutor


 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @param threadFactory the factory to use when the executor
 * creates a new thread
 * @return a newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @throws NullPointerException if threadFactory is null
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);

 * Creates a new {@code ScheduledThreadPoolExecutor} with the
 * given initial parameters.
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @throws NullPointerException if {@code threadFactory} is null
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);


public Interface ScheduledExecutorService{

    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);




public interface CompletionService<V> {
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit timeUnit) throws InterruptedException;   
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;


Timer -> ScheduledThreadPoolExecutor






