第三篇 Executor和ExecutorService
简单谈一下对并发处理包中Executor和ExecutorService(线程池)的理解,多是翻译的Java API,然后写了一个实际用例,方便理解。
Executors类
常用方法
newFixedThreadPool
1public static ExecutorService newFixedThreadPool(int nThreads)参数:nThreads:线程数
返回:新创建的线程池
抛出:IllegalArgumentException: 当nThreads <= 0时,抛出
说明:创建一个可重用的线程数量固定的线程池,去操作一个共享无界队列。newSingleThreadExecutor
1public static ExecutorService newSingleThreadExecutor()返回:新创建的单线程Executor
说明:创建一个单线程的Executor,操作一个无界队列。与newFixedThreadPool(1)不同,返回的执行器不能重新配置使用额外的线程。newCachedThreadPool
1public static ExecutorService newCachedThreadPool()返回:新创建的线程池
说明:创建一个根据需要创建新线程的线程池,但会在以前构造的线程可用时重用它。 这些池通常会提高执行许多短暂异步任务的程序的性能。 调用执行将重用以前构造的线程(如果可用)。 如果没有现有线程可用,将创建一个新线程并将其添加到池中。 未使用60秒的线程将被终止并从缓存中删除。 因此,保持空闲足够长时间的池将不消耗任何资源。 注意,可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如,超时参数)的池。以上3个方法都可以增加一个参数:ThreadFactory threadFactory
12345public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)参数:threadFactory: 用于创建新线程的工厂类
抛出:NullPointerException:当threadFactory为null时
ExecutorService类
类描述
|
|
父类:Executor
子类:ScheduledExecutorService
实现类:AbstractExecutorService, ForkJoinPool, ScheduledThreadPoolExecutor, ThreadPoolExecutor
常用方法
execute (父类方法)
1void execute(Runnable command)参数:command: 线程任务
抛出:RejectedExecutionException - 任务无法接受执行
NullPointerException - command为null
说明:执行任务submit
1<T> Future<T> submit(Callable<T> task)参数:task - 要提交的任务
返回:Future
抛出:RejectedExecutionException - 任务无法接受执行
NullPointerException - command为null
说明:Future的get方法将在成功完成后返回任务的结果。1<T> Future<T> submit(Runnable task, T result)参数:task - 要提交的任务
result - 返回的结果
返回:
抛出:RejectedExecutionException - 任务无法接受执行
NullPointerException - command为null
说明:1Future<?> submit(Runnable task)参数:task - 要提交的任务
返回:
抛出:RejectedExecutionException - 任务无法接受执行
NullPointerException - command为null
说明:shutdown
1void shutdown()说明:启动有序关闭,其中执行先前提交的任务,但不会接受新任务。 如果已关闭,调用没有其他效果。
此方法不等待先前提交的任务完成执行。 使用awaitTermination来做到这一点。shutdownNow
1List<Runnable> shutdownNow()返回: 没有开始执行的任务的List集合
说明: 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务的List集合。isShutdown
1boolean isShutdown()返回:如果executor已经关闭就返回true
isTerminated
1boolean isTerminated()返回: 如果所有任务在关闭后完成,则返回true。 注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不为true。
awaitTermination
123boolean awaitTermination(long timeout,TimeUnit unit)throws InterruptedException参数: timeout - 等待的最大时间
unit - timeout参数的时间单位
说明:如果此执行程序终止,则为true,如果超时在终止之前已过,则为false
用例
新建线程池,调用execute()或submit()方法
1234567891011121314151617private void batchExecuteAllOrder() {ExecutorService threadPool = null;final int THREAD_COUNT = 10; // 线程数量try {threadPool = Executors.newFixedThreadPool(THREAD_COUNT);for (OrderBean order : orders) {threadPool.execute(new MatchWorkFlowRuleRunner(order, matchWorkFlowRuleService));}while(!threadPool.isTerminated()) {// 等待所有子线程结束,才退出主线程threadPool.shutdown();}} catch (Exception e) {//....}}线程实现类,及所要执行的任务
123456789101112131415161718192021222324252627282930package com.ailk.jobs.order.workflow;import com.ailk.common.order.bean.OrderBean;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.List;public class MatchWorkFlowRuleRunner implements Runnable {private static final Logger logger = LoggerFactory.getLogger(MatchWorkFlowRuleRunner.class);private List<OrderBean> orders;private final OrderBean order;private final MatchWorkFlowRuleService matchWorkFlowRuleService;public MatchWorkFlowRuleRunner(OrderBean order, MatchWorkFlowRuleService matchWorkFlowRuleService) {this.order = order;this.matchWorkFlowRuleService = matchWorkFlowRuleService;}@Overridepublic void run() {try {//线程所需要操作的过程//matchWorkFlowRuleService.processSingleOrder(order);} catch (Exception e) {Thread.currentThread().interrupt();}}}多线程、多任务
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970package com.ai.acodm.controller.order.exhanded;import com.google.common.collect.Sets;import org.phw.core.lang.Collections;import java.util.ArrayList;import java.util.Collection;import java.util.List;import java.util.Map;import java.util.concurrent.*;/*** Created by unisk1123 on 2017/5/10.*/class payProcess implements Callable{@Overridepublic Object call() throws Exception {System.out.println("payProcess");return null;}}class notifyProcess implements Callable{@Overridepublic Object call() throws Exception {System.out.println("notifyProcess");return null;}}class releasePreNumProcess implements Callable{@Overridepublic Object call() throws Exception {System.out.println("releasePreNumProcess");return null;}}public class CallableTest {public static void main(String[] args) {Callable payProcess = new payProcess();Callable notifyProcess = new notifyProcess();Callable releasePreNumProcess = new releasePreNumProcess();Collection callableTasks = Sets.newHashSet(payProcess, notifyProcess, releasePreNumProcess);ExecutorService executorService = null;List<Map<String, Object>> rts = new ArrayList<Map<String, Object>>();try {executorService = Executors.newFixedThreadPool(callableTasks.size());List<Future<Map<String, Object>>> futures = executorService.invokeAll(callableTasks);if (!Collections.isEmpty(futures)){for (Future<Map<String, Object>> future : futures) {try {rts.add(future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}} catch (Exception e) {e.printStackTrace();} finally {executorService.shutdown();}}}