/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
An unbounded blocking queue of Delayed
elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed
element whose delay expired furthest in the past. If no delay has expired there is no head and poll
will return null
. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS)
method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take
or poll
, they are otherwise treated as normal elements. For example, the size
method returns the count of both expired and unexpired elements. This queue does not permit null elements. This class and its iterator implement all of the optional methods of the Collection
and Iterator
interfaces. The Iterator provided in method iterator()
is not
guaranteed to traverse the elements of the DelayQueue in any
particular order.
This class is a member of the
Java Collections Framework.
Author: Doug Lea Type parameters: - <E> – the type of elements held in this queue
Since: 1.5
/**
* An unbounded {@linkplain BlockingQueue blocking queue} of
* {@code Delayed} elements, in which an element can only be taken
* when its delay has expired. The <em>head</em> of the queue is that
* {@code Delayed} element whose delay expired furthest in the
* past. If no delay has expired there is no head and {@code poll}
* will return {@code null}. Expiration occurs when an element's
* {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
* than or equal to zero. Even though unexpired elements cannot be
* removed using {@code take} or {@code poll}, they are otherwise
* treated as normal elements. For example, the {@code size} method
* returns the count of both expired and unexpired elements.
* This queue does not permit null elements.
*
* <p>This class and its iterator implement all of the <em>optional</em>
* methods of the {@link Collection} and {@link Iterator} interfaces.
* The Iterator provided in method {@link #iterator()} is <em>not</em>
* guaranteed to traverse the elements of the DelayQueue in any
* particular order.
*
* <p>This class is a member of the
* <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this queue
*/
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
Thread designated to wait for the element at the head of
the queue. This variant of the Leader-Follower pattern
(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
minimize unnecessary timed waiting. When a thread becomes
the leader, it waits only for the next delay to elapse, but
other threads await indefinitely. The leader thread must
signal some other thread before returning from take() or
poll(...), unless some other thread becomes leader in the
interim. Whenever the head of the queue is replaced with
an element with an earlier expiration time, the leader
field is invalidated by being reset to null, and some
waiting thread, but not necessarily the current leader, is
signalled. So waiting threads must be prepared to acquire
and lose leadership while waiting.
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader;
Condition signalled when a newer element becomes available
at the head of the queue or a new thread may need to
become leader.
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();
Creates a new DelayQueue
that is initially empty. /**
* Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}
Creates a DelayQueue
initially containing the elements of the given collection of Delayed
instances. Params: - c – the collection of elements to initially contain
Throws: - NullPointerException – if the specified collection or any
of its elements are null
/**
* Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
Inserts the specified element into this delay queue.
Params: - e – the element to add
Throws: - NullPointerException – if the specified element is null
Returns: true
(as specified by Collection.add
)
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}
Inserts the specified element into this delay queue.
Params: - e – the element to add
Throws: - NullPointerException – if the specified element is null
Returns: true
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
Inserts the specified element into this delay queue. As the queue is
unbounded this method will never block.
Params: - e – the element to add
Throws: - NullPointerException – {@inheritDoc}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
Inserts the specified element into this delay queue. As the queue is
unbounded this method will never block.
Params: - e – the element to add
- timeout – This parameter is ignored as the method never blocks
- unit – This parameter is ignored as the method never blocks
Throws: - NullPointerException – {@inheritDoc}
Returns: true
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return {@code true}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
Retrieves and removes the head of this queue, or returns null
if this queue has no elements with an expired delay. Returns: the head of this queue, or null
if this queue has no elements with an expired delay
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}
Retrieves and removes the head of this queue, waiting if necessary
until an element with an expired delay is available on this queue.
Throws: - InterruptedException – {@inheritDoc}
Returns: the head of this queue
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
Retrieves and removes the head of this queue, waiting if necessary
until an element with an expired delay is available on this queue,
or the specified wait time expires.
Throws: - InterruptedException – {@inheritDoc}
Returns: the head of this queue, or null
if the specified waiting time elapses before an element with an expired delay becomes available
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0L)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
if (nanos <= 0L)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
Retrieves, but does not remove, the head of this queue, or returns null
if this queue is empty. Unlike poll
, if no expired elements are available in the queue, this method returns the element that will expire next, if one exists. Returns: the head of this queue, or null
if this queue is empty
/**
* Retrieves, but does not remove, the head of this queue, or
* returns {@code null} if this queue is empty. Unlike
* {@code poll}, if no expired elements are available in the queue,
* this method returns the element that will expire next,
* if one exists.
*
* @return the head of this queue, or {@code null} if this
* queue is empty
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
Throws: - UnsupportedOperationException – {@inheritDoc}
- ClassCastException – {@inheritDoc}
- NullPointerException – {@inheritDoc}
- IllegalArgumentException – {@inheritDoc}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
Throws: - UnsupportedOperationException – {@inheritDoc}
- ClassCastException – {@inheritDoc}
- NullPointerException – {@inheritDoc}
- IllegalArgumentException – {@inheritDoc}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (E first;
n < maxElements
&& (first = q.peek()) != null
&& first.getDelay(NANOSECONDS) <= 0;) {
c.add(first); // In this order, in case add() throws.
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
Atomically removes all of the elements from this delay queue.
The queue will be empty after this call returns.
Elements with an unexpired delay are not waited for; they are
simply discarded from the queue.
/**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
* Elements with an unexpired delay are not waited for; they are
* simply discarded from the queue.
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
Always returns Integer.MAX_VALUE
because a DelayQueue
is not capacity constrained. Returns: Integer.MAX_VALUE
/**
* Always returns {@code Integer.MAX_VALUE} because
* a {@code DelayQueue} is not capacity constrained.
*
* @return {@code Integer.MAX_VALUE}
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
Returns an array containing all of the elements in this queue.
The returned array elements are in no particular order.
The returned array will be "safe" in that no references to it are
maintained by this queue. (In other words, this method must allocate
a new array). The caller is thus free to modify the returned array.
This method acts as bridge between array-based and collection-based
APIs.
Returns: an array containing all of the elements in this queue
/**
* Returns an array containing all of the elements in this queue.
* The returned array elements are in no particular order.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
Returns an array containing all of the elements in this queue; the
runtime type of the returned array is that of the specified array.
The returned array elements are in no particular order.
If the queue fits in the specified array, it is returned therein.
Otherwise, a new array is allocated with the runtime type of the
specified array and the size of this queue.
If this queue fits in the specified array with room to spare (i.e., the array has more elements than this queue), the element in the array immediately following the end of the queue is set to null
.
Like the toArray()
method, this method acts as bridge between array-based and collection-based APIs. Further, this method allows precise control over the runtime type of the output array, and may, under certain circumstances, be used to save allocation costs.
The following code can be used to dump a delay queue into a newly allocated array of Delayed
:
Delayed[] a = q.toArray(new Delayed[0]);
Note that toArray(new Object[0])
is identical in function to toArray()
. Params: - a – the array into which the elements of the queue are to
be stored, if it is big enough; otherwise, a new array of the
same runtime type is allocated for this purpose
Throws: - ArrayStoreException – if the runtime type of the specified array
is not a supertype of the runtime type of every element in
this queue
- NullPointerException – if the specified array is null
Returns: an array containing all of the elements in this queue
/**
* Returns an array containing all of the elements in this queue; the
* runtime type of the returned array is that of the specified array.
* The returned array elements are in no particular order.
* If the queue fits in the specified array, it is returned therein.
* Otherwise, a new array is allocated with the runtime type of the
* specified array and the size of this queue.
*
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>The following code can be used to dump a delay queue into a newly
* allocated array of {@code Delayed}:
*
* <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
Removes a single instance of the specified element from this
queue, if it is present, whether or not it has expired.
/**
* Removes a single instance of the specified element from this
* queue, if it is present, whether or not it has expired.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
Identity-based version for use in Itr.remove.
/**
* Identity-based version for use in Itr.remove.
*/
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
if (o == it.next()) {
it.remove();
break;
}
}
} finally {
lock.unlock();
}
}
Returns an iterator over all the elements (both expired and
unexpired) in this queue. The iterator does not return the
elements in any particular order.
The returned iterator is
weakly consistent.
Returns: an iterator over the elements in this queue
/**
* Returns an iterator over all the elements (both expired and
* unexpired) in this queue. The iterator does not return the
* elements in any particular order.
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue
*/
public Iterator<E> iterator() {
return new Itr(toArray());
}
Snapshot iterator that works off copy of underlying q array.
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
return (E)array[lastRet = cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
}