/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

A ThreadPoolExecutor that can additionally schedule commands to run after a given delay, or to execute periodically. This class is preferable to Timer when multiple worker threads are needed, or when the additional flexibility or capabilities of ThreadPoolExecutor (which this class extends) are required.

Delayed tasks execute no sooner than they are enabled, but without any real-time guarantees about when, after they are enabled, they will commence. Tasks scheduled for exactly the same execution time are enabled in first-in-first-out (FIFO) order of submission.

When a submitted task is cancelled before it is run, execution is suppressed. By default, such a cancelled task is not automatically removed from the work queue until its delay elapses. While this enables further inspection and monitoring, it may also cause unbounded retention of cancelled tasks. To avoid this, use setRemoveOnCancelPolicy to cause tasks to be immediately removed from the work queue at time of cancellation.

Successive executions of a periodic task scheduled via scheduleAtFixedRate or scheduleWithFixedDelay do not overlap. While different executions may be performed by different threads, the effects of prior executions happen-before those of subsequent ones.

While this class inherits from ThreadPoolExecutor, a few of the inherited tuning methods are not useful for it. In particular, because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have no useful effect. Additionally, it is almost never a good idea to set corePoolSize to zero or use allowCoreThreadTimeOut because this may leave the pool without threads to handle tasks once they become eligible to run.

As with ThreadPoolExecutor, if not otherwise specified, this class uses Executors.defaultThreadFactory as the default thread factory, and AbortPolicy as the default rejected execution handler.

Extension notes: This class overrides the execute and submit methods to generate internal ScheduledFuture objects to control per-task delays and scheduling. To preserve functionality, any further overrides of these methods in subclasses must invoke superclass versions, which effectively disables additional task customization. However, this class provides alternative protected extension method decorateTask (one version each for Runnable and Callable) that can be used to customize the concrete task types used to execute commands entered via execute, submit, schedule, scheduleAtFixedRate, and scheduleWithFixedDelay. By default, a ScheduledThreadPoolExecutor uses a task type extending FutureTask. However, this may be modified or replaced using subclasses of the form:

 
public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
  static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
  protected <V> RunnableScheduledFuture<V> decorateTask(
               Runnable r, RunnableScheduledFuture<V> task) {
      return new CustomTask<V>(r, task);
  }
  protected <V> RunnableScheduledFuture<V> decorateTask(
               Callable<V> c, RunnableScheduledFuture<V> task) {
      return new CustomTask<V>(c, task);
  }
  // ... add constructors, etc.
 }
Author:Doug Lea
Since:1.5
/** * A {@link ThreadPoolExecutor} that can additionally schedule * commands to run after a given delay, or to execute periodically. * This class is preferable to {@link java.util.Timer} when multiple * worker threads are needed, or when the additional flexibility or * capabilities of {@link ThreadPoolExecutor} (which this class * extends) are required. * * <p>Delayed tasks execute no sooner than they are enabled, but * without any real-time guarantees about when, after they are * enabled, they will commence. Tasks scheduled for exactly the same * execution time are enabled in first-in-first-out (FIFO) order of * submission. * * <p>When a submitted task is cancelled before it is run, execution * is suppressed. By default, such a cancelled task is not * automatically removed from the work queue until its delay elapses. * While this enables further inspection and monitoring, it may also * cause unbounded retention of cancelled tasks. To avoid this, use * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately * removed from the work queue at time of cancellation. * * <p>Successive executions of a periodic task scheduled via * {@link #scheduleAtFixedRate scheduleAtFixedRate} or * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} * do not overlap. While different executions may be performed by * different threads, the effects of prior executions * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * those of subsequent ones. * * <p>While this class inherits from {@link ThreadPoolExecutor}, a few * of the inherited tuning methods are not useful for it. In * particular, because it acts as a fixed-sized pool using * {@code corePoolSize} threads and an unbounded queue, adjustments * to {@code maximumPoolSize} have no useful effect. Additionally, it * is almost never a good idea to set {@code corePoolSize} to zero or * use {@code allowCoreThreadTimeOut} because this may leave the pool * without threads to handle tasks once they become eligible to run. * * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified, * this class uses {@link Executors#defaultThreadFactory} as the * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy} * as the default rejected execution handler. * * <p><b>Extension notes:</b> This class overrides the * {@link ThreadPoolExecutor#execute(Runnable) execute} and * {@link AbstractExecutorService#submit(Runnable) submit} * methods to generate internal {@link ScheduledFuture} objects to * control per-task delays and scheduling. To preserve * functionality, any further overrides of these methods in * subclasses must invoke superclass versions, which effectively * disables additional task customization. However, this class * provides alternative protected extension method * {@code decorateTask} (one version each for {@code Runnable} and * {@code Callable}) that can be used to customize the concrete task * types used to execute commands entered via {@code execute}, * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, * and {@code scheduleWithFixedDelay}. By default, a * {@code ScheduledThreadPoolExecutor} uses a task type extending * {@link FutureTask}. However, this may be modified or replaced using * subclasses of the form: * * <pre> {@code * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { * * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } * * protected <V> RunnableScheduledFuture<V> decorateTask( * Runnable r, RunnableScheduledFuture<V> task) { * return new CustomTask<V>(r, task); * } * * protected <V> RunnableScheduledFuture<V> decorateTask( * Callable<V> c, RunnableScheduledFuture<V> task) { * return new CustomTask<V>(c, task); * } * // ... add constructors, etc. * }}</pre> * * @since 1.5 * @author Doug Lea */
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /* * This class specializes ThreadPoolExecutor implementation by * * 1. Using a custom task type ScheduledFutureTask, even for tasks * that don't require scheduling because they are submitted * using ExecutorService rather than ScheduledExecutorService * methods, which are treated as tasks with a delay of zero. * * 2. Using a custom queue (DelayedWorkQueue), a variant of * unbounded DelayQueue. The lack of capacity constraint and * the fact that corePoolSize and maximumPoolSize are * effectively identical simplifies some execution mechanics * (see delayedExecute) compared to ThreadPoolExecutor. * * 3. Supporting optional run-after-shutdown parameters, which * leads to overrides of shutdown methods to remove and cancel * tasks that should NOT be run after shutdown, as well as * different recheck logic when task (re)submission overlaps * with a shutdown. * * 4. Task decoration methods to allow interception and * instrumentation, which are needed because subclasses cannot * otherwise override submit methods to get this effect. These * don't have any impact on pool control logic though. */
False if should cancel/suppress periodic tasks on shutdown.
/** * False if should cancel/suppress periodic tasks on shutdown. */
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
False if should cancel non-periodic not-yet-expired tasks on shutdown.
/** * False if should cancel non-periodic not-yet-expired tasks on shutdown. */
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
True if ScheduledFutureTask.cancel should remove from queue.
/** * True if ScheduledFutureTask.cancel should remove from queue. */
volatile boolean removeOnCancel;
Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied entries.
/** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */
private static final AtomicLong sequencer = new AtomicLong(); private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
Sequence number to break ties FIFO
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
The nanoTime-based time when the task is enabled to execute.
/** The nanoTime-based time when the task is enabled to execute. */
private volatile long time;
Period for repeating tasks, in nanoseconds. A positive value indicates fixed-rate execution. A negative value indicates fixed-delay execution. A value of 0 indicates a non-repeating (one-shot) task.
/** * Period for repeating tasks, in nanoseconds. * A positive value indicates fixed-rate execution. * A negative value indicates fixed-delay execution. * A value of 0 indicates a non-repeating (one-shot) task. */
private final long period;
The actual task to be re-enqueued by reExecutePeriodic
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
Index into delay queue, to support faster cancellation.
/** * Index into delay queue, to support faster cancellation. */
int heapIndex;
Creates a one-shot action with given nanoTime-based trigger time.
/** * Creates a one-shot action with given nanoTime-based trigger time. */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) { super(r, result); this.time = triggerTime; this.period = 0; this.sequenceNumber = sequenceNumber; }
Creates a periodic action with given nanoTime-based initial trigger time and period.
/** * Creates a periodic action with given nanoTime-based initial * trigger time and period. */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber) { super(r, result); this.time = triggerTime; this.period = period; this.sequenceNumber = sequenceNumber; }
Creates a one-shot action with given nanoTime-based trigger time.
/** * Creates a one-shot action with given nanoTime-based trigger time. */
ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber) { super(callable); this.time = triggerTime; this.period = 0; this.sequenceNumber = sequenceNumber; } public long getDelay(TimeUnit unit) { return unit.convert(time - System.nanoTime(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
Returns true if this is a periodic (not a one-shot) action.
Returns:true if periodic
/** * Returns {@code true} if this is a periodic (not a one-shot) action. * * @return {@code true} if periodic */
public boolean isPeriodic() { return period != 0; }
Sets the next time to run for a periodic task.
/** * Sets the next time to run for a periodic task. */
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } public boolean cancel(boolean mayInterruptIfRunning) { // The racy read of heapIndex below is benign: // if heapIndex < 0, then OOTA guarantees that we have surely // been removed; else we recheck under lock in remove() boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; }
Overrides FutureTask version so as to reset/requeue if periodic.
/** * Overrides FutureTask version so as to reset/requeue if periodic. */
public void run() { if (!canRunInCurrentRunState(this)) cancel(false); else if (!isPeriodic()) super.run(); else if (super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } }
Returns true if can run a task given current run state and run-after-shutdown parameters.
/** * Returns true if can run a task given current run state and * run-after-shutdown parameters. */
boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) { if (!isShutdown()) return true; if (isStopped()) return false; return task.isPeriodic() ? continueExistingPeriodicTasksAfterShutdown : (executeExistingDelayedTasksAfterShutdown || task.getDelay(NANOSECONDS) <= 0); }
Main execution method for delayed or periodic tasks. If pool is shut down, rejects the task. Otherwise adds task to queue and starts a thread, if necessary, to run it. (We cannot prestart the thread to run the task because the task (probably) shouldn't be run yet.) If the pool is shut down while the task is being added, cancel and remove it if required by state and run-after-shutdown parameters.
Params:
  • task – the task
/** * Main execution method for delayed or periodic tasks. If pool * is shut down, rejects the task. Otherwise adds task to queue * and starts a thread, if necessary, to run it. (We cannot * prestart the thread to run the task because the task (probably) * shouldn't be run yet.) If the pool is shut down while the task * is being added, cancel and remove it if required by state and * run-after-shutdown parameters. * * @param task the task */
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (!canRunInCurrentRunState(task) && remove(task)) task.cancel(false); else ensurePrestart(); } }
Requeues a periodic task unless current run state precludes it. Same idea as delayedExecute except drops task rather than rejecting.
Params:
  • task – the task
/** * Requeues a periodic task unless current run state precludes it. * Same idea as delayedExecute except drops task rather than rejecting. * * @param task the task */
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(task)) { super.getQueue().add(task); if (canRunInCurrentRunState(task) || !remove(task)) { ensurePrestart(); return; } } task.cancel(false); }
Cancels and clears the queue of all tasks that should not be run due to shutdown policy. Invoked within super.shutdown.
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. Invoked within super.shutdown. */
@Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // Traverse snapshot to avoid iterator exceptions // TODO: implement and use efficient removeIf // super.getQueue().removeIf(...); for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : (!keepDelayed && t.getDelay(NANOSECONDS) > 0)) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } tryTerminate(); }
Modifies or replaces the task used to execute a runnable. This method can be used to override the concrete class used for managing internal tasks. The default implementation simply returns the given task.
Params:
  • runnable – the submitted Runnable
  • task – the task created to execute the runnable
Type parameters:
  • <V> – the type of the task's result
Returns:a task that can execute the runnable
Since:1.6
/** * Modifies or replaces the task used to execute a runnable. * This method can be used to override the concrete * class used for managing internal tasks. * The default implementation simply returns the given task. * * @param runnable the submitted Runnable * @param task the task created to execute the runnable * @param <V> the type of the task's result * @return a task that can execute the runnable * @since 1.6 */
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; }
Modifies or replaces the task used to execute a callable. This method can be used to override the concrete class used for managing internal tasks. The default implementation simply returns the given task.
Params:
  • callable – the submitted Callable
  • task – the task created to execute the callable
Type parameters:
  • <V> – the type of the task's result
Returns:a task that can execute the callable
Since:1.6
/** * Modifies or replaces the task used to execute a callable. * This method can be used to override the concrete * class used for managing internal tasks. * The default implementation simply returns the given task. * * @param callable the submitted Callable * @param task the task created to execute the callable * @param <V> the type of the task's result * @return a task that can execute the callable * @since 1.6 */
protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task; }
The default keep-alive time for pool threads. Normally, this value is unused because all pool threads will be core threads, but if a user creates a pool with a corePoolSize of zero (against our advice), we keep a thread alive as long as there are queued tasks. If the keep alive time is zero (the historic value), we end up hot-spinning in getTask, wasting a CPU. But on the other hand, if we set the value too high, and users create a one-shot pool which they don't cleanly shutdown, the pool's non-daemon threads will prevent JVM termination. A small but non-zero value (relative to a JVM's lifetime) seems best.
/** * The default keep-alive time for pool threads. * * Normally, this value is unused because all pool threads will be * core threads, but if a user creates a pool with a corePoolSize * of zero (against our advice), we keep a thread alive as long as * there are queued tasks. If the keep alive time is zero (the * historic value), we end up hot-spinning in getTask, wasting a * CPU. But on the other hand, if we set the value too high, and * users create a one-shot pool which they don't cleanly shutdown, * the pool's non-daemon threads will prevent JVM termination. A * small but non-zero value (relative to a JVM's lifetime) seems * best. */
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
Creates a new ScheduledThreadPoolExecutor with the given core pool size.
Params:
  • corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
Throws:
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
Creates a new ScheduledThreadPoolExecutor with the given initial parameters.
Params:
  • corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
  • threadFactory – the factory to use when the executor creates a new thread
Throws:
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} is null */
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }
Creates a new ScheduledThreadPoolExecutor with the given initial parameters.
Params:
  • corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
  • handler – the handler to use when execution is blocked because the thread bounds and queue capacities are reached
Throws:
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code handler} is null */
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler); }
Creates a new ScheduledThreadPoolExecutor with the given initial parameters.
Params:
  • corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
  • threadFactory – the factory to use when the executor creates a new thread
  • handler – the handler to use when execution is blocked because the thread bounds and queue capacities are reached
Throws:
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} or * {@code handler} is null */
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); }
Returns the nanoTime-based trigger time of a delayed action.
/** * Returns the nanoTime-based trigger time of a delayed action. */
private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); }
Returns the nanoTime-based trigger time of a delayed action.
/** * Returns the nanoTime-based trigger time of a delayed action. */
long triggerTime(long delay) { return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
Constrains the values of all delays in the queue to be within Long.MAX_VALUE of each other, to avoid overflow in compareTo. This may occur if a task is eligible to be dequeued, but has not yet been, while some other task is added with a delay of Long.MAX_VALUE.
/** * Constrains the values of all delays in the queue to be within * Long.MAX_VALUE of each other, to avoid overflow in compareTo. * This may occur if a task is eligible to be dequeued, but has * not yet been, while some other task is added with a delay of * Long.MAX_VALUE. */
private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; }
Submits a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is, executions will commence after initialDelay, then initialDelay + period, then initialDelay + 2 * period, and so on.

The sequence of task executions continues indefinitely until one of the following exceptional completions occur:

Subsequent executions are suppressed. Subsequent calls to isDone() on the returned future will return true.

If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

Throws:
/** * Submits a periodic action that becomes enabled first after the * given initial delay, and subsequently with the given period; * that is, executions will commence after * {@code initialDelay}, then {@code initialDelay + period}, then * {@code initialDelay + 2 * period}, and so on. * * <p>The sequence of task executions continues indefinitely until * one of the following exceptional completions occur: * <ul> * <li>The task is {@linkplain Future#cancel explicitly cancelled} * via the returned future. * <li>Method {@link #shutdown} is called and the {@linkplain * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on * whether to continue after shutdown} is not set true, or method * {@link #shutdownNow} is called; also resulting in task * cancellation. * <li>An execution of the task throws an exception. In this case * calling {@link Future#get() get} on the returned future will throw * {@link ExecutionException}, holding the exception as its cause. * </ul> * Subsequent executions are suppressed. Subsequent calls to * {@link Future#isDone isDone()} on the returned future will * return {@code true}. * * <p>If any execution of this task takes longer than its period, then * subsequent executions may start late, but will not concurrently * execute. * * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
Submits a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.

The sequence of task executions continues indefinitely until one of the following exceptional completions occur:

Subsequent executions are suppressed. Subsequent calls to isDone() on the returned future will return true.
Throws:
/** * Submits a periodic action that becomes enabled first after the * given initial delay, and subsequently with the given delay * between the termination of one execution and the commencement of * the next. * * <p>The sequence of task executions continues indefinitely until * one of the following exceptional completions occur: * <ul> * <li>The task is {@linkplain Future#cancel explicitly cancelled} * via the returned future. * <li>Method {@link #shutdown} is called and the {@linkplain * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on * whether to continue after shutdown} is not set true, or method * {@link #shutdownNow} is called; also resulting in task * cancellation. * <li>An execution of the task throws an exception. In this case * calling {@link Future#get() get} on the returned future will throw * {@link ExecutionException}, holding the exception as its cause. * </ul> * Subsequent executions are suppressed. Subsequent calls to * {@link Future#isDone isDone()} on the returned future will * return {@code true}. * * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
Executes command with zero required delay. This has effect equivalent to schedule(command, 0, anyUnit). Note that inspections of the queue and of the list returned by shutdownNow will access the zero-delayed ScheduledFuture, not the command itself.

A consequence of the use of ScheduledFuture objects is that afterExecute is always called with a null second Throwable argument, even if the command terminated abruptly. Instead, the Throwable thrown by such a task can be obtained via Future.get.

Throws:
/** * Executes {@code command} with zero required delay. * This has effect equivalent to * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. * Note that inspections of the queue and of the list returned by * {@code shutdownNow} will access the zero-delayed * {@link ScheduledFuture}, not the {@code command} itself. * * <p>A consequence of the use of {@code ScheduledFuture} objects is * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always * called with a null second {@code Throwable} argument, even if the * {@code command} terminated abruptly. Instead, the {@code Throwable} * thrown by such a task can be obtained via {@link Future#get}. * * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution because the * executor has been shut down * @throws NullPointerException {@inheritDoc} */
public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } // Override AbstractExecutorService methods
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); }
Throws:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
Sets the policy on whether to continue executing existing periodic tasks even when this executor has been shutdown. In this case, executions will continue until shutdownNow or the policy is set to false when already shutdown. This value is by default false.
Params:
  • value – if true, continue after shutdown, else don't
See Also:
/** * Sets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}. * In this case, executions will continue until {@code shutdownNow} * or the policy is set to {@code false} when already shutdown. * This value is by default {@code false}. * * @param value if {@code true}, continue after shutdown, else don't * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy */
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); }
Gets the policy on whether to continue executing existing periodic tasks even when this executor has been shutdown. In this case, executions will continue until shutdownNow or the policy is set to false when already shutdown. This value is by default false.
See Also:
Returns:true if will continue after shutdown
/** * Gets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}. * In this case, executions will continue until {@code shutdownNow} * or the policy is set to {@code false} when already shutdown. * This value is by default {@code false}. * * @return {@code true} if will continue after shutdown * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy */
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { return continueExistingPeriodicTasksAfterShutdown; }
Sets the policy on whether to execute existing delayed tasks even when this executor has been shutdown. In this case, these tasks will only terminate upon shutdownNow, or after setting the policy to false when already shutdown. This value is by default true.
Params:
  • value – if true, execute after shutdown, else don't
See Also:
/** * Sets the policy on whether to execute existing delayed * tasks even when this executor has been {@code shutdown}. * In this case, these tasks will only terminate upon * {@code shutdownNow}, or after setting the policy to * {@code false} when already shutdown. * This value is by default {@code true}. * * @param value if {@code true}, execute after shutdown, else don't * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy */
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); }
Gets the policy on whether to execute existing delayed tasks even when this executor has been shutdown. In this case, these tasks will only terminate upon shutdownNow, or after setting the policy to false when already shutdown. This value is by default true.
See Also:
Returns:true if will execute after shutdown
/** * Gets the policy on whether to execute existing delayed * tasks even when this executor has been {@code shutdown}. * In this case, these tasks will only terminate upon * {@code shutdownNow}, or after setting the policy to * {@code false} when already shutdown. * This value is by default {@code true}. * * @return {@code true} if will execute after shutdown * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy */
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { return executeExistingDelayedTasksAfterShutdown; }
Sets the policy on whether cancelled tasks should be immediately removed from the work queue at time of cancellation. This value is by default false.
Params:
  • value – if true, remove on cancellation, else don't
See Also:
Since:1.7
/** * Sets the policy on whether cancelled tasks should be immediately * removed from the work queue at time of cancellation. This value is * by default {@code false}. * * @param value if {@code true}, remove on cancellation, else don't * @see #getRemoveOnCancelPolicy * @since 1.7 */
public void setRemoveOnCancelPolicy(boolean value) { removeOnCancel = value; }
Gets the policy on whether cancelled tasks should be immediately removed from the work queue at time of cancellation. This value is by default false.
See Also:
Returns:true if cancelled tasks are immediately removed from the queue
Since:1.7
/** * Gets the policy on whether cancelled tasks should be immediately * removed from the work queue at time of cancellation. This value is * by default {@code false}. * * @return {@code true} if cancelled tasks are immediately removed * from the queue * @see #setRemoveOnCancelPolicy * @since 1.7 */
public boolean getRemoveOnCancelPolicy() { return removeOnCancel; }
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.

If the ExecuteExistingDelayedTasksAfterShutdownPolicy has been set false, existing delayed tasks whose delays have not yet elapsed are cancelled. And unless the ContinueExistingPeriodicTasksAfterShutdownPolicy has been set true, future executions of existing periodic tasks will be cancelled.

Throws:
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} * has been set {@code false}, existing delayed tasks whose delays * have not yet elapsed are cancelled. And unless the {@code * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set * {@code true}, future executions of existing periodic tasks will * be cancelled. * * @throws SecurityException {@inheritDoc} */
public void shutdown() { super.shutdown(); }
Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. These tasks are drained (removed) from the task queue upon return from this method.

This method does not wait for actively executing tasks to terminate. Use awaitTermination to do that.

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. This implementation interrupts tasks via Thread.interrupt; any task that fails to respond to interrupts may never terminate.

Throws:
Returns:list of tasks that never commenced execution. Each element of this list is a ScheduledFuture. For tasks submitted via one of the schedule methods, the element will be identical to the returned ScheduledFuture. For tasks submitted using execute, the element will be a zero-delay ScheduledFuture.
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * interrupts tasks via {@link Thread#interrupt}; any task that * fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution. * Each element of this list is a {@link ScheduledFuture}. * For tasks submitted via one of the {@code schedule} * methods, the element will be identical to the returned * {@code ScheduledFuture}. For tasks submitted using * {@link #execute execute}, the element will be a * zero-delay {@code ScheduledFuture}. * @throws SecurityException {@inheritDoc} */
public List<Runnable> shutdownNow() { return super.shutdownNow(); }
Returns the task queue used by this executor. Access to the task queue is intended primarily for debugging and monitoring. This queue may be in active use. Retrieving the task queue does not prevent queued tasks from executing.

Each element of this queue is a ScheduledFuture. For tasks submitted via one of the schedule methods, the element will be identical to the returned ScheduledFuture. For tasks submitted using execute, the element will be a zero-delay ScheduledFuture.

Iteration over this queue is not guaranteed to traverse tasks in the order in which they will execute.

Returns:the task queue
/** * Returns the task queue used by this executor. Access to the * task queue is intended primarily for debugging and monitoring. * This queue may be in active use. Retrieving the task queue * does not prevent queued tasks from executing. * * <p>Each element of this queue is a {@link ScheduledFuture}. * For tasks submitted via one of the {@code schedule} methods, the * element will be identical to the returned {@code ScheduledFuture}. * For tasks submitted using {@link #execute execute}, the element * will be a zero-delay {@code ScheduledFuture}. * * <p>Iteration over this queue is <em>not</em> guaranteed to traverse * tasks in the order in which they will execute. * * @return the task queue */
public BlockingQueue<Runnable> getQueue() { return super.getQueue(); }
Specialized delay queue. To mesh with TPE declarations, this class must be declared as a BlockingQueue even though it can only hold RunnableScheduledFutures.
/** * Specialized delay queue. To mesh with TPE declarations, this * class must be declared as a BlockingQueue<Runnable> even though * it can only hold RunnableScheduledFutures. */
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { /* * A DelayedWorkQueue is based on a heap-based data structure * like those in DelayQueue and PriorityQueue, except that * every ScheduledFutureTask also records its index into the * heap array. This eliminates the need to find a task upon * cancellation, greatly speeding up removal (down from O(n) * to O(log n)), and reducing garbage retention that would * otherwise occur by waiting for the element to rise to top * before clearing. But because the queue may also hold * RunnableScheduledFutures that are not ScheduledFutureTasks, * we are not guaranteed to have such indices available, in * which case we fall back to linear search. (We expect that * most tasks will not be decorated, and that the faster cases * will be much more common.) * * All heap operations must record index changes -- mainly * within siftUp and siftDown. Upon removal, a task's * heapIndex is set to -1. Note that ScheduledFutureTasks can * appear at most once in the queue (this need not be true for * other kinds of tasks or work queues), so are uniquely * identified by heapIndex. */ private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size;
Thread designated to wait for the task at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely. The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. Whenever the head of the queue is replaced with a task with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled. So waiting threads must be prepared to acquire and lose leadership while waiting.
/** * Thread designated to wait for the task at the head of the * queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with a * task with an earlier expiration time, the leader field is * invalidated by being reset to null, and some waiting * thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */
private Thread leader;
Condition signalled when a newer task becomes available at the head of the queue or a new thread may need to become leader.
/** * Condition signalled when a newer task becomes available at the * head of the queue or a new thread may need to become leader. */
private final Condition available = lock.newCondition();
Sets f's heapIndex if it is a ScheduledFutureTask.
/** * Sets f's heapIndex if it is a ScheduledFutureTask. */
private static void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; }
Sifts element added at bottom up to its heap-ordered spot. Call only when holding lock.
/** * Sifts element added at bottom up to its heap-ordered spot. * Call only when holding lock. */
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
Sifts element added at top down to its heap-ordered spot. Call only when holding lock.
/** * Sifts element added at top down to its heap-ordered spot. * Call only when holding lock. */
private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
Resizes the heap array. Call only when holding lock.
/** * Resizes the heap array. Call only when holding lock. */
private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; queue = Arrays.copyOf(queue, newCapacity); }
Finds index of given object, or -1 if absent.
/** * Finds index of given object, or -1 if absent. */
private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; // Sanity check; x could conceivably be a // ScheduledFutureTask from some other pool. if (i >= 0 && i < size && queue[i] == x) return i; } else { for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; } public boolean contains(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { return indexOf(x) != -1; } finally { lock.unlock(); } } public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } public boolean isEmpty() { return size() == 0; } public int remainingCapacity() { return Integer.MAX_VALUE; } public RunnableScheduledFuture<?> peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { lock.unlock(); } } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); }
Performs common bookkeeping for poll and take: Replaces first element with last and sifts it down. Call only when holding lock.
Params:
  • f – the task to remove and return
/** * Performs common bookkeeping for poll and take: Replaces * first element with last and sifts it down. Call only when * holding lock. * @param f the task to remove and return */
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : finishPoll(first); } finally { lock.unlock(); } } public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0L) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return finishPoll(first); if (nanos <= 0L) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = 0; i < size; i++) { RunnableScheduledFuture<?> t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } } size = 0; } finally { lock.unlock(); } } public int drainTo(Collection<? super Runnable> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo(Collection<? super Runnable> c, int maxElements) { Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (RunnableScheduledFuture<?> first; n < maxElements && (first = queue[0]) != null && first.getDelay(NANOSECONDS) <= 0;) { c.add(first); // In this order, in case add() throws. finishPoll(first); ++n; } return n; } finally { lock.unlock(); } } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return Arrays.copyOf(queue, size, Object[].class); } finally { lock.unlock(); } } @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { if (a.length < size) return (T[]) Arrays.copyOf(queue, size, a.getClass()); System.arraycopy(queue, 0, a, 0, size); if (a.length > size) a[size] = null; return a; } finally { lock.unlock(); } } public Iterator<Runnable> iterator() { final ReentrantLock lock = this.lock; lock.lock(); try { return new Itr(Arrays.copyOf(queue, size)); } finally { lock.unlock(); } }
Snapshot iterator that works off copy of underlying q array.
/** * Snapshot iterator that works off copy of underlying q array. */
private class Itr implements Iterator<Runnable> { final RunnableScheduledFuture<?>[] array; int cursor; // index of next element to return; initially 0 int lastRet = -1; // index of last element returned; -1 if no such Itr(RunnableScheduledFuture<?>[] array) { this.array = array; } public boolean hasNext() { return cursor < array.length; } public Runnable next() { if (cursor >= array.length) throw new NoSuchElementException(); return array[lastRet = cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); DelayedWorkQueue.this.remove(array[lastRet]); lastRet = -1; } } } }