Copyright 2012 Netflix, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright 2012 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package com.netflix.hystrix; import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory; import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Scheduler; import rx.functions.Func0; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
ThreadPool used to executed HystrixCommand.run() on separate threads when configured to do so with HystrixCommandProperties.executionIsolationStrategy().

Typically each HystrixCommandGroupKey has its own thread-pool so that any one group of commands can not starve others from being able to run.

A HystrixCommand can be configured with a thread-pool explicitly by injecting a HystrixThreadPoolKey or via the HystrixCommandProperties.executionIsolationThreadPoolKeyOverride() otherwise it will derive a HystrixThreadPoolKey from the injected HystrixCommandGroupKey.

The pool should be sized large enough to handle normal healthy traffic but small enough that it will constrain concurrent execution if backend calls become latent.

For more information see the Github Wiki: https://github.com/Netflix/Hystrix/wiki/Configuration#wiki-ThreadPool and https://github.com/Netflix/Hystrix/wiki/How-it-Works#wiki-Isolation

/** * ThreadPool used to executed {@link HystrixCommand#run()} on separate threads when configured to do so with {@link HystrixCommandProperties#executionIsolationStrategy()}. * <p> * Typically each {@link HystrixCommandGroupKey} has its own thread-pool so that any one group of commands can not starve others from being able to run. * <p> * A {@link HystrixCommand} can be configured with a thread-pool explicitly by injecting a {@link HystrixThreadPoolKey} or via the * {@link HystrixCommandProperties#executionIsolationThreadPoolKeyOverride()} otherwise it * will derive a {@link HystrixThreadPoolKey} from the injected {@link HystrixCommandGroupKey}. * <p> * The pool should be sized large enough to handle normal healthy traffic but small enough that it will constrain concurrent execution if backend calls become latent. * <p> * For more information see the Github Wiki: https://github.com/Netflix/Hystrix/wiki/Configuration#wiki-ThreadPool and https://github.com/Netflix/Hystrix/wiki/How-it-Works#wiki-Isolation */
public interface HystrixThreadPool {
Implementation of ThreadPoolExecutor.
Returns:ThreadPoolExecutor
/** * Implementation of {@link ThreadPoolExecutor}. * * @return ThreadPoolExecutor */
public ExecutorService getExecutor(); public Scheduler getScheduler(); public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
Mark when a thread begins executing a command.
/** * Mark when a thread begins executing a command. */
public void markThreadExecution();
Mark when a thread completes executing a command.
/** * Mark when a thread completes executing a command. */
public void markThreadCompletion();
Mark when a command gets rejected from the threadpool
/** * Mark when a command gets rejected from the threadpool */
public void markThreadRejection();
Whether the queue will allow adding an item to it.

This allows dynamic control of the max queueSize versus whatever the actual max queueSize is so that dynamic changes can be done via property changes rather than needing an app restart to adjust when commands should be rejected from queuing up.

Returns:boolean whether there is space on the queue
/** * Whether the queue will allow adding an item to it. * <p> * This allows dynamic control of the max queueSize versus whatever the actual max queueSize is so that dynamic changes can be done via property changes rather than needing an app * restart to adjust when commands should be rejected from queuing up. * * @return boolean whether there is space on the queue */
public boolean isQueueSpaceAvailable();
@ExcludeFromJavadoc
/** * @ExcludeFromJavadoc */
/* package */static class Factory { /* * Use the String from HystrixThreadPoolKey.name() instead of the HystrixThreadPoolKey instance as it's just an interface and we can't ensure the object * we receive implements hashcode/equals correctly and do not want the default hashcode/equals which would create a new threadpool for every object we get even if the name is the same */ /* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
Get the HystrixThreadPool instance for a given HystrixThreadPoolKey.

This is thread-safe and ensures only 1 HystrixThreadPool per HystrixThreadPoolKey.

Returns:HystrixThreadPool instance
/** * Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}. * <p> * This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}. * * @return {@link HystrixThreadPool} instance */
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
Initiate the shutdown of all HystrixThreadPool instances.

NOTE: This is NOT thread-safe if HystrixCommands are concurrently being executed and causing thread-pools to initialize while also trying to shutdown.

/** * Initiate the shutdown of all {@link HystrixThreadPool} instances. * <p> * NOTE: This is NOT thread-safe if HystrixCommands are concurrently being executed * and causing thread-pools to initialize while also trying to shutdown. * </p> */
/* package */static synchronized void shutdown() { for (HystrixThreadPool pool : threadPools.values()) { pool.getExecutor().shutdown(); } threadPools.clear(); }
Initiate the shutdown of all HystrixThreadPool instances and wait up to the given time on each pool to complete.

NOTE: This is NOT thread-safe if HystrixCommands are concurrently being executed and causing thread-pools to initialize while also trying to shutdown.

/** * Initiate the shutdown of all {@link HystrixThreadPool} instances and wait up to the given time on each pool to complete. * <p> * NOTE: This is NOT thread-safe if HystrixCommands are concurrently being executed * and causing thread-pools to initialize while also trying to shutdown. * </p> */
/* package */static synchronized void shutdown(long timeout, TimeUnit unit) { for (HystrixThreadPool pool : threadPools.values()) { pool.getExecutor().shutdown(); } for (HystrixThreadPool pool : threadPools.values()) { try { while (! pool.getExecutor().awaitTermination(timeout, unit)) { } } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting for thread-pools to terminate. Pools may not be correctly shutdown or cleared.", e); } } threadPools.clear(); } }
@ExcludeFromJavadoc
@ThreadSafe
/** * @ExcludeFromJavadoc * @ThreadSafe */
/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool { private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class); private final HystrixThreadPoolProperties properties; private final BlockingQueue<Runnable> queue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; private final int queueSize; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); } @Override public ThreadPoolExecutor getExecutor() { touchConfig(); return threadPool; } @Override public Scheduler getScheduler() { //by default, interrupt underlying threads on timeout return getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return true; } }); } @Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // allow us to change things via fast-properties by setting it each time private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); final int configuredMaximumSize = properties.maximumSize().get(); int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); boolean maxTooLow = false; if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) { //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { if (maxTooLow) { logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ". Maximum size will be set to " + dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); } threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); } @Override public void markThreadExecution() { metrics.markThreadExecution(); } @Override public void markThreadCompletion() { metrics.markThreadCompletion(); } @Override public void markThreadRejection() { metrics.markThreadRejection(); }
Whether the threadpool queue has space available according to the queueSizeRejectionThreshold settings. Note that the queueSize is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. The data structure is static, so this does not make sense as a dynamic lookup. The queueSizeRejectionThreshold can be dynamic (up to queueSize), so that should still get checked on each invocation.

If a SynchronousQueue implementation is used (maxQueueSize <= 0), it always returns 0 as the size so this would always return true.

/** * Whether the threadpool queue has space available according to the <code>queueSizeRejectionThreshold</code> settings. * * Note that the <code>queueSize</code> is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The <code>queueSizeRejectionThreshold</code> can be dynamic (up to <code>queueSize</code>), so that should * still get checked on each invocation. * <p> * If a SynchronousQueue implementation is used (<code>maxQueueSize</code> <= 0), it always returns 0 as the size so this would always return true. */
@Override public boolean isQueueSpaceAvailable() { if (queueSize <= 0) { // we don't have a queue so we won't look for space but instead // let the thread-pool reject or not return true; } else { return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); } } } }