/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.util.threads;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.tomcat.util.res.StringManager;

Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient getSubmittedCount() method, to be used to properly handle the work queue. If a RejectedExecutionHandler is not specified a default one will be configured and that one will always throw a RejectedExecutionException
/** * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. * If a RejectedExecutionHandler is not specified a default one will be configured * and that one will always throw a RejectedExecutionException * */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
The string manager for this package.
/** * The string manager for this package. */
protected static final StringManager sm = StringManager .getManager("org.apache.tomcat.util.threads.res");
The number of tasks submitted but not yet finished. This includes tasks in the queue and tasks that have been handed to a worker thread but the latter did not start executing the task yet. This number is always greater or equal to ThreadPoolExecutor.getActiveCount().
/** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */
private final AtomicInteger submittedCount = new AtomicInteger(0); private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
Most recent time in ms when a thread decided to kill itself to avoid potential memory leaks. Useful to throttle the rate of renewals of threads.
/** * Most recent time in ms when a thread decided to kill itself to avoid * potential memory leaks. Useful to throttle the rate of renewals of * threads. */
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
Delay in ms between 2 threads being renewed. If negative, do not renew threads.
/** * Delay in ms between 2 threads being renewed. If negative, do not renew threads. */
private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); prestartAllCoreThreads(); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); prestartAllCoreThreads(); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler()); prestartAllCoreThreads(); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler()); prestartAllCoreThreads(); } public long getThreadRenewalDelay() { return threadRenewalDelay; } public void setThreadRenewalDelay(long threadRenewalDelay) { this.threadRenewalDelay = threadRenewalDelay; } @Override protected void afterExecute(Runnable r, Throwable t) { submittedCount.decrementAndGet(); if (t == null) { stopCurrentThreadIfNeeded(); } }
If the current thread was started before the last time when a context was stopped, an exception is thrown so that the current thread is stopped.
/** * If the current thread was started before the last time when a context was * stopped, an exception is thrown so that the current thread is stopped. */
protected void stopCurrentThreadIfNeeded() { if (currentThreadShouldBeStopped()) { long lastTime = lastTimeThreadKilledItself.longValue(); if (lastTime + threadRenewalDelay < System.currentTimeMillis()) { if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis() + 1)) { // OK, it's really time to dispose of this thread final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak", Thread.currentThread().getName()); throw new StopPooledThreadException(msg); } } } } protected boolean currentThreadShouldBeStopped() { if (threadRenewalDelay >= 0 && Thread.currentThread() instanceof TaskThread) { TaskThread currentTaskThread = (TaskThread) Thread.currentThread(); if (currentTaskThread.getCreationTime() < this.lastContextStoppedTime.longValue()) { return true; } } return false; } public int getSubmittedCount() { return submittedCount.get(); }
{@inheritDoc}
/** * {@inheritDoc} */
@Override public void execute(Runnable command) { execute(command,0,TimeUnit.MILLISECONDS); }
Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation. If no threads are available, it will be added to the work queue. If the work queue is full, the system will wait for the specified time and it throw a RejectedExecutionException if the queue is still full after that.
Params:
  • command – the runnable task
  • timeout – A timeout for the completion of the task
  • unit – The timeout time unit
Throws:
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <code>Executor</code> implementation. * If no threads are available, it will be added to the work queue. * If the work queue is full, the system will wait for the specified * time and it throw a RejectedExecutionException if the queue is still * full after that. * * @param command the runnable task * @param timeout A timeout for the completion of the task * @param unit The timeout time unit * @throws RejectedExecutionException if this task cannot be * accepted for execution - the queue is full * @throws NullPointerException if command or unit is null */
public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } } public void contextStopping() { this.lastContextStoppedTime.set(System.currentTimeMillis()); // save the current pool parameters to restore them later int savedCorePoolSize = this.getCorePoolSize(); TaskQueue taskQueue = getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null; if (taskQueue != null) { // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize // checks that queue.remainingCapacity()==0. I did not understand // why, but to get the intended effect of waking up idle threads, I // temporarily fake this condition. taskQueue.setForcedRemainingCapacity(Integer.valueOf(0)); } // setCorePoolSize(0) wakes idle threads this.setCorePoolSize(0); // TaskQueue.take() takes care of timing out, so that we are sure that // all threads of the pool are renewed in a limited time, something like // (threadKeepAlive + longest request time) if (taskQueue != null) { // ok, restore the state of the queue and pool taskQueue.setForcedRemainingCapacity(null); } this.setCorePoolSize(savedCorePoolSize); } private static class RejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) { throw new RejectedExecutionException(); } } }