/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

Provides default implementations of ExecutorService execution methods. This class implements the submit, invokeAny and invokeAll methods using a RunnableFuture returned by newTaskFor, which defaults to the FutureTask class provided in this package. For example, the implementation of submit(Runnable) creates an associated RunnableFuture that is executed and returned. Subclasses may override the newTaskFor methods to return RunnableFuture implementations other than FutureTask.

Extension example. Here is a sketch of a class that customizes ThreadPoolExecutor to use a CustomTask class instead of the default FutureTask:

 
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  static class CustomTask<V> implements RunnableFuture<V> {...}
  protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
      return new CustomTask<V>(c);
  }
  protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
      return new CustomTask<V>(r, v);
  }
  // ... add constructors, etc.
 }
Author:Doug Lea
Since:1.5
/** * Provides default implementations of {@link ExecutorService} * execution methods. This class implements the {@code submit}, * {@code invokeAny} and {@code invokeAll} methods using a * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults * to the {@link FutureTask} class provided in this package. For example, * the implementation of {@code submit(Runnable)} creates an * associated {@code RunnableFuture} that is executed and * returned. Subclasses may override the {@code newTaskFor} methods * to return {@code RunnableFuture} implementations other than * {@code FutureTask}. * * <p><b>Extension example</b>. Here is a sketch of a class * that customizes {@link ThreadPoolExecutor} to use * a {@code CustomTask} class instead of the default {@code FutureTask}: * <pre> {@code * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { * * static class CustomTask<V> implements RunnableFuture<V> {...} * * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { * return new CustomTask<V>(c); * } * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { * return new CustomTask<V>(r, v); * } * // ... add constructors, etc. * }}</pre> * * @since 1.5 * @author Doug Lea */
public abstract class AbstractExecutorService implements ExecutorService {
Returns a RunnableFuture for the given runnable and default value.
Params:
  • runnable – the runnable task being wrapped
  • value – the default value for the returned future
Type parameters:
  • <T> – the type of the given value
Returns:a RunnableFuture which, when run, will run the underlying runnable and which, as a Future, will yield the given value as its result and provide for cancellation of the underlying task
Since:1.6
/** * Returns a {@code RunnableFuture} for the given runnable and default * value. * * @param runnable the runnable task being wrapped * @param value the default value for the returned future * @param <T> the type of the given value * @return a {@code RunnableFuture} which, when run, will run the * underlying runnable and which, as a {@code Future}, will yield * the given value as its result and provide for cancellation of * the underlying task * @since 1.6 */
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
Returns a RunnableFuture for the given callable task.
Params:
  • callable – the callable task being wrapped
Type parameters:
  • <T> – the type of the callable's result
Returns:a RunnableFuture which, when run, will call the underlying callable and which, as a Future, will yield the callable's result as its result and provide for cancellation of the underlying task
Since:1.6
/** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param <T> the type of the callable's result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable's result as its result and provide for * cancellation of the underlying task * @since 1.6 */
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
the main mechanics of invokeAny.
/** * the main mechanics of invokeAny. */
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); // For efficiency, especially in executors with limited // parallelism, check to see if previously submitted tasks are // done before submitting more of them. This interleaving // plus the exception mechanics account for messiness of main // loop. try { // Record exceptions so that if we fail to obtain any // result, we can throw the last exception we got. ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); // Start one task for sure; the rest incrementally futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { Future<T> f = ecs.poll(); if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) break; else if (timed) { f = ecs.poll(nanos, NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else f = ecs.take(); } if (f != null) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { cancelAll(futures); } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException | ExecutionException ignore) {} } } return futures; } catch (Throwable t) { cancelAll(futures); throw t; } } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); final long nanos = unit.toNanos(timeout); final long deadline = System.nanoTime() + nanos; ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); int j = 0; timedOut: try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); final int size = futures.size(); // Interleave time checks and calls to execute in case // executor doesn't have any/much parallelism. for (int i = 0; i < size; i++) { if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L) break timedOut; execute((Runnable)futures.get(i)); } for (; j < size; j++) { Future<T> f = futures.get(j); if (!f.isDone()) { try { f.get(deadline - System.nanoTime(), NANOSECONDS); } catch (CancellationException | ExecutionException ignore) {} catch (TimeoutException timedOut) { break timedOut; } } } return futures; } catch (Throwable t) { cancelAll(futures); throw t; } // Timed out before all the tasks could be completed; cancel remaining cancelAll(futures, j); return futures; } private static <T> void cancelAll(ArrayList<Future<T>> futures) { cancelAll(futures, 0); }
Cancels all futures with index at least j.
/** Cancels all futures with index at least j. */
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) { for (int size = futures.size(); j < size; j++) futures.get(j).cancel(true); } }