简单谈一下对并发处理包中Executor和ExecutorService(线程池)的理解,多是翻译的Java API,然后写了一个实际用例,方便理解。

Executors类

常用方法

  1. newFixedThreadPool

    1
    public static ExecutorService newFixedThreadPool(int nThreads)

    参数:nThreads:线程数
    返回:新创建的线程池
    抛出:IllegalArgumentException: 当nThreads <= 0时,抛出
    说明:创建一个可重用的线程数量固定的线程池,去操作一个共享无界队列。

  2. newSingleThreadExecutor

    1
    public static ExecutorService newSingleThreadExecutor()

    返回:新创建的单线程Executor
    说明:创建一个单线程的Executor,操作一个无界队列。与newFixedThreadPool(1)不同,返回的执行器不能重新配置使用额外的线程。

  3. newCachedThreadPool

    1
    public static ExecutorService newCachedThreadPool()

    返回:新创建的线程池
    说明:创建一个根据需要创建新线程的线程池,但会在以前构造的线程可用时重用它。 这些池通常会提高执行许多短暂异步任务的程序的性能。 调用执行将重用以前构造的线程(如果可用)。 如果没有现有线程可用,将创建一个新线程并将其添加到池中。 未使用60秒的线程将被终止并从缓存中删除。 因此,保持空闲足够长时间的池将不消耗任何资源。 注意,可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如,超时参数)的池。

  4. 以上3个方法都可以增加一个参数:ThreadFactory threadFactory

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

    参数:threadFactory: 用于创建新线程的工厂类
    抛出:NullPointerException:当threadFactory为null时

ExecutorService类

类描述

1
public interface ExecutorService extends Executor

父类:Executor
子类:ScheduledExecutorService
实现类:AbstractExecutorService, ForkJoinPool, ScheduledThreadPoolExecutor, ThreadPoolExecutor

常用方法

  1. execute (父类方法)

    1
    void execute(Runnable command)

    参数:command: 线程任务
    抛出:RejectedExecutionException - 任务无法接受执行
    NullPointerException - command为null
    说明:执行任务

  2. submit

    1
    <T> Future<T> submit(Callable<T> task)

    参数:task - 要提交的任务
    返回:Future
    抛出:RejectedExecutionException - 任务无法接受执行
    NullPointerException - command为null
    说明:Future的get方法将在成功完成后返回任务的结果。

    1
    <T> Future<T> submit(Runnable task, T result)

    参数:task - 要提交的任务
    result - 返回的结果
    返回:
    抛出:RejectedExecutionException - 任务无法接受执行
    NullPointerException - command为null
    说明:

    1
    Future<?> submit(Runnable task)

    参数:task - 要提交的任务
    返回:
    抛出:RejectedExecutionException - 任务无法接受执行
    NullPointerException - command为null
    说明:

  3. shutdown

    1
    void shutdown()

    说明:启动有序关闭,其中执行先前提交的任务,但不会接受新任务。 如果已关闭,调用没有其他效果。
    此方法不等待先前提交的任务完成执行。 使用awaitTermination来做到这一点。

  4. shutdownNow

    1
    List<Runnable> shutdownNow()

    返回: 没有开始执行的任务的List集合
    说明: 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务的List集合。

  5. isShutdown

    1
    boolean isShutdown()

    返回:如果executor已经关闭就返回true

  6. isTerminated

    1
    boolean isTerminated()

    返回: 如果所有任务在关闭后完成,则返回true。 注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不为true。

  7. awaitTermination

    1
    2
    3
    boolean awaitTermination(long timeout,
    TimeUnit unit)
    throws InterruptedException

    参数: timeout - 等待的最大时间
    unit - timeout参数的时间单位
    说明:如果此执行程序终止,则为true,如果超时在终止之前已过,则为false

用例

  1. 新建线程池,调用execute()或submit()方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private void batchExecuteAllOrder() {
    ExecutorService threadPool = null;
    final int THREAD_COUNT = 10; // 线程数量
    try {
    threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    for (OrderBean order : orders) {
    threadPool.execute(new MatchWorkFlowRuleRunner(order, matchWorkFlowRuleService));
    }
    while(!threadPool.isTerminated()) {
    // 等待所有子线程结束,才退出主线程
    threadPool.shutdown();
    }
    } catch (Exception e) {
    //....
    }
    }
  2. 线程实现类,及所要执行的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.ailk.jobs.order.workflow;
    import com.ailk.common.order.bean.OrderBean;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.List;
    public class MatchWorkFlowRuleRunner implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MatchWorkFlowRuleRunner.class);
    private List<OrderBean> orders;
    private final OrderBean order;
    private final MatchWorkFlowRuleService matchWorkFlowRuleService;
    public MatchWorkFlowRuleRunner(OrderBean order, MatchWorkFlowRuleService matchWorkFlowRuleService) {
    this.order = order;
    this.matchWorkFlowRuleService = matchWorkFlowRuleService;
    }
    @Override
    public void run() {
    try {
    //线程所需要操作的过程
    //matchWorkFlowRuleService.processSingleOrder(order);
    } catch (Exception e) {
    Thread.currentThread().interrupt();
    }
    }
    }
  3. 多线程、多任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    package com.ai.acodm.controller.order.exhanded;
    import com.google.common.collect.Sets;
    import org.phw.core.lang.Collections;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.*;
    /**
    * Created by unisk1123 on 2017/5/10.
    */
    class payProcess implements Callable{
    @Override
    public Object call() throws Exception {
    System.out.println("payProcess");
    return null;
    }
    }
    class notifyProcess implements Callable{
    @Override
    public Object call() throws Exception {
    System.out.println("notifyProcess");
    return null;
    }
    }
    class releasePreNumProcess implements Callable{
    @Override
    public Object call() throws Exception {
    System.out.println("releasePreNumProcess");
    return null;
    }
    }
    public class CallableTest {
    public static void main(String[] args) {
    Callable payProcess = new payProcess();
    Callable notifyProcess = new notifyProcess();
    Callable releasePreNumProcess = new releasePreNumProcess();
    Collection callableTasks = Sets.newHashSet(payProcess, notifyProcess, releasePreNumProcess);
    ExecutorService executorService = null;
    List<Map<String, Object>> rts = new ArrayList<Map<String, Object>>();
    try {
    executorService = Executors.newFixedThreadPool(callableTasks.size());
    List<Future<Map<String, Object>>> futures = executorService.invokeAll(callableTasks);
    if (!Collections.isEmpty(futures)){
    for (Future<Map<String, Object>> future : futures) {
    try {
    rts.add(future.get());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    executorService.shutdown();
    }
    }
    }