源码非常简单,只有一个execute(Runnable command)回调接口
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <tt>Executor</tt> implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution. * @throws NullPointerException if command is null */ void execute(Runnable command); }
执行已提交的 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
class SerialExecutor implements Executor { final Queuetasks = new LinkedBlockingQueue (); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
2.ExcutorService接口
ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。 shutdown()方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务的启动并试图停止当前正在执行的任务。在终止后,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService以允许回收其资源。 通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法 Executor.execute(java.lang.Runnable)。 方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个, 或全部任务完成(可使用 ExecutorCompletionService类来编写这些方法的自定义变体)。 Executors类为创建ExecutorService提供了便捷的工厂方法。 注意1:它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。 关于ThreadPoolExecutor的更多内容请参考《》 关于ScheduledThreadPoolExecutor的更多内容请参考《》 用法示例 下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool(int) 工厂方法:
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后等60秒后,任务还没执行完成,就调用 shutdownNow(如有必要)取消所有遗留的任务:
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } }内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作, 后者依次 happen-before 通过 Future.get() 获取的结果。 主要函数: void shutdown() 启动一个关闭命令,不再接受新任务,当所有已提交任务执行完后,就关闭。如果已经关闭,则调用没有其他作用。 抛出: SecurityException - 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。List<Runnable> shutdownNow() 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。 返回: 从未开始执行的任务的列表 抛出: SecurityException - 如果安全管理器存在并且关闭, 此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission("modifyThread")), 或者安全管理器的 checkAccess 方法拒绝访问。注意1: 它会返回等待执行的任务列表。注意2: 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消,所以任何任务无法响应中断都可能永远无法终止。boolean isShutdown() 如果此执行程序已关闭,则返回 true。 返回: 如果此执行程序已关闭,则返回 trueboolean isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。 返回: 如果关闭后所有任务都已完成,则返回 trueboolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException 等待(阻塞)直到关闭或最长等待时间或发生中断 参数: timeout - 最长等待时间 unit - timeout 参数的时间单位 返回: 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false 抛出: InterruptedException - 如果等待时发生中断注意1:如果此执行程序终止(关闭),则返回 true;如果终止前超时期满,则返回 false<T> Future<T> submit(Callable<T> task) 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。 注:Executors 类包括了一组方法,可以转换某些其他常见的类似于闭包的对象, 例如,将 PrivilegedAction 转换为 Callable 形式,这样就可以提交它们了。 参数: task - 要提交的任务 返回: 表示任务等待完成的 Future 抛出: RejectedExecutionException - 如果任务无法安排执行 NullPointerException - 如果该任务为 null注意:关于submit的使用和Callable可以参阅《》
每个 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
但是,强烈建议程序员使用较为方便的 Executors 工厂方法 (无界线程池,可以进行自动线程回收)、(固定大小线程池)和(单个后台线程),
它们均为大多数使用场景预定义了设置。否则,在手动配置和调整此类时,使用以下指导:
核心和最大池大小ThreadPoolExecutorcorePoolSize maximumPoolSize当新任务在方法中提交时,如果运行的线程少于 , 则创建新线程来处理请求,即使有线程是空闲的。
corePoolSize maximumPoolSizcorePoolSizemaximumPoolSizemaximumPoolSize Integer.MAX_VALUEsetCorePoolSize(int) setMaximumPoolSize(int)
按需构造 prestartCoreThread()prestartAllCoreThreads()
使用创建新线程。如果没有另外说明,则使用 创建线程,他们在同一个ThreadGroup中 并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。
如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
注意1:可以指定创建线程的ThreadFactory,默认的是使用Executors.defaultThreadFactory()来创建线程,所有的线程都在一个ThreadGroup中。
保持活动时间 如果池中当前有多于corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止 (参见 )。这提供了当池处于非活动状态时减少资源消耗的方法。 如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。
如果把值设为 的话,空闲线程不会被回收直到ThreadPoolExecutor为Terminate。
默认情况下,保持活动策略只在有多于corePoolSizeThreads 的线程时应用。
但是只要 keepAliveTime 值非 0,方法也可将此超时策略应用于核心线程。
注意1:setKeepAliveTime(long, java.util.concurrent.TimeUnit)用于设置空闲线程最长的活动时间,
即如果空闲时间超过设定值,就停掉该线程,对该线程进行回收。
该策略默认只对非内核线程有用(即当前线程数大于corePoolSize),
可以调用allowCoreThreadTimeOut(boolean)方法将此超时策略扩大到核心线程
注意2:如果把值设为Long.MAX_VALUE TimeUnit.NANOSECONDS的话,空闲线程不会被回收直到ThreadPoolExecutor为Terminate。
排队BlockingQueue。
排队有三种通用策略:
:此策略允许线程无界的增长。
3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。
队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
被拒绝的任务RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) ThreadPoolExecutor.AbortPolicy ThreadPoolExecutor.CallerRunsPolicyThreadPoolExecutor.DiscardPolicyThreadPoolExecutor.DiscardOldestPolicy java.lang.Thread, java.lang.Runnable)
和 (java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。
它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。
此外,还可以重写方法 来执行 Executor 完全终止后需要完成的所有特殊处理。
removepurge() removepurge()
设置适当保持活动时间,使用0核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
扩展示例。此类的大多数扩展可以重写一个或多个受保护的钩子 (hook) 方法。例如,下面是一个添加了简单的暂停/恢复功能的子类:
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }}关于它的使用请参考《class
RejectedExecutionException. | |||||||||||||
execute method, unless the executor has been shut down, in which case the task is discarded. | |||||||||||||
ThreadPoolExecutor.DiscardOldestPolicy | execute, unless the executor is shut down, in which case the task is discarded. | ||||||||||||
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。 使用 Executors 工厂方法之一比使用此通用构造方法方便得多。 参数: corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 抛出: IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0, 或者 corePoolSize 大于 maximumPoolSize。 NullPointerException - 如果 workQueue 为 nullpublic ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。 参数: corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 threadFactory - 执行程序创建新线程时使用的工厂。 抛出: IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。 NullPointerException - 如果 workQueue 或 threadFactory 为 null。public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。 参数: corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。 handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 抛出: IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0, 或者 corePoolSize 大于 maximumPoolSize。 execute(Runnable command) 在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。 参数: command - 要执行的任务。 抛出: RejectedExecutionException - 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException NullPointerException - 如果命令为 null public void shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。 抛出: SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。public List<Runnable> shutdownNow() 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。 并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。 返回: 从未开始执行的任务的列表。 抛出: SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")), 或者安全管理器的 checkAccess 方法拒绝访问。public int prestartAllCoreThreads() 启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。 返回: 已启动的线程数public boolean allowsCoreThreadTimeOut() 如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。当返回 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。当返回 false(默认值)时,由于没有传入任务,核心线程不会终止。 返回: 如果允许核心线程超时,则返回 true;否则返回 falsepublic void allowCoreThreadTimeOut(boolean value) 如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。 参数: value - 如果应该超时,则为 true;否则为 false 抛出: IllegalArgumentException - 如果 value 为 true 并且当前保持活动时间不大于 0。public boolean remove(Runnable task) 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则让其不再运行。 此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。 例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,purge() 方法可用于移除那些已被取消的 Future。 参数: task - 要移除的任务 返回: 如果已经移除任务,则返回 truepublic void purge() 尝试从工作队列移除所有已取消的 Future 任务。此方法可用作存储回收操作,它对功能没有任何影响。 取消的任务不会再次执行,但是它们可能在工作队列中累积,直到worker线程主动将其移除。 调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。 当然它还实现了的ExecutorService的submit系列接口 abstract <T> <T>
|