序
本文主要研究一下HystrixConcurrencyStrategy
HystrixConcurrencyStrategy
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java
代码语言:javascript复制/**
* Abstract class for defining different behavior or implementations for concurrency related aspects of the system with default implementations.
* <p>
* For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with
* additional behavior.
* <p>
* When you implement a concrete {@link HystrixConcurrencyStrategy}, you should make the strategy idempotent w.r.t ThreadLocals.
* Since the usage of threads by Hystrix is internal, Hystrix does not attempt to apply the strategy in an idempotent way.
* Instead, you should write your strategy to work idempotently. See https://github.com/Netflix/Hystrix/issues/351 for a more detailed discussion.
* <p>
* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
* href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
*/
public abstract class HystrixConcurrencyStrategy {
private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class);
/**
* Factory method to provide {@link ThreadPoolExecutor} instances as desired.
* <p>
* Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
* {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
* <p>
* <b>Default Implementation</b>
* <p>
* Implementation using standard java.util.concurrent.ThreadPoolExecutor
*
* @param threadPoolKey
* {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
* @param corePoolSize
* Core number of threads requested via properties (or system default if no properties set).
* @param maximumPoolSize
* Max number of threads requested via properties (or system default if no properties set).
* @param keepAliveTime
* Keep-alive time for threads requested via properties (or system default if no properties set).
* @param unit
* {@link TimeUnit} corresponding with keepAliveTime
* @param workQueue
* {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
* @return instance of {@link ThreadPoolExecutor}
*/
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
//......
}
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
//......
}
/**
* Factory method to provide instance of {@code BlockingQueue<Runnable>} used for each {@link ThreadPoolExecutor} as constructed in {@link #getThreadPool}.
* <p>
* Note: The maxQueueSize value is provided so any type of queue can be used but typically an implementation such as {@link SynchronousQueue} without a queue (just a handoff) is preferred as
* queueing is an anti-pattern to be purposefully avoided for latency tolerance reasons.
* <p>
* <b>Default Implementation</b>
* <p>
* Implementation returns {@link SynchronousQueue} when maxQueueSize <= 0 or {@link LinkedBlockingQueue} when maxQueueSize > 0.
*
* @param maxQueueSize
* The max size of the queue requested via properties (or system default if no properties set).
* @return instance of {@code BlockingQueue<Runnable>}
*/
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
//......
}
/**
* Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
* <p>
* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
* <p>
* <b>Default Implementation</b>
* <p>
* Pass-thru that does no wrapping.
*
* @param callable
* {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
* @return {@code Callable<T>} either as a pass-thru or wrapping the one given
*/
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
/**
* Factory method to return an implementation of {@link HystrixRequestVariable} that behaves like a {@link ThreadLocal} except that it
* is scoped to a request instead of a thread.
* <p>
* For example, if a request starts with an HTTP request and ends with the HTTP response, then {@link HystrixRequestVariable} should
* be initialized at the beginning, available on any and all threads spawned during the request and then cleaned up once the HTTP request is completed.
* <p>
* If this method is implemented it is generally necessary to also implemented {@link #wrapCallable(Callable)} in order to copy state
* from parent to child thread.
*
* @param rv
* {@link HystrixRequestVariableLifecycle} with lifecycle implementations from Hystrix
* @return {@code HystrixRequestVariable<T>}
*/
public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
return new HystrixLifecycleForwardingRequestVariable<T>(rv);
}
}
- 这个类主要提供了一些方法允许自定义线程隔离的一些配置
- getThreadPool()以及getBlockingQueue()方法,用于自定义线程池及其队列
- wrapCallable()允许你去修饰Callable,比如做些上下文数据传递
- getRequestVariable()返回HystrixRequestVariable,类似ThreadLocal
getThreadPool
代码语言:javascript复制 /**
* Factory method to provide {@link ThreadPoolExecutor} instances as desired.
* <p>
* Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
* {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
* <p>
* <b>Default Implementation</b>
* <p>
* Implementation using standard java.util.concurrent.ThreadPoolExecutor
*
* @param threadPoolKey
* {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
* @param corePoolSize
* Core number of threads requested via properties (or system default if no properties set).
* @param maximumPoolSize
* Max number of threads requested via properties (or system default if no properties set).
* @param keepAliveTime
* Keep-alive time for threads requested via properties (or system default if no properties set).
* @param unit
* {@link TimeUnit} corresponding with keepAliveTime
* @param workQueue
* {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
* @return instance of {@link ThreadPoolExecutor}
*/
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " threadPoolKey.name() " is trying to set coreSize = "
dynamicCoreSize " and maximumSize = " dynamicMaximumSize ". Maximum size will be set to "
dynamicCoreSize ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
}
}
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " threadPoolKey.name() " is trying to set coreSize = "
dynamicCoreSize " and maximumSize = " dynamicMaximumSize ". Maximum size will be set to "
dynamicCoreSize ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" threadPoolKey.name() "-" threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
} else {
return PlatformSpecific.getAppEngineThreadFactory();
}
}
- 根据HystrixThreadPoolKey以及HystrixThreadPoolProperties构建线程池
- HystrixThreadPoolKey主要用来获取ThreadFactory,自定义了线程池的名称
- HystrixThreadPoolProperties主要是dynamicCoreSize(
corePoolSize
)、dynamicMaximumSize(maximumPoolSize
)、keepAliveTime、maxQueueSize这几个参数
HystrixLifecycleForwardingRequestVariable
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixLifecycleForwardingRequestVariable.java
代码语言:javascript复制/**
* Implementation of {@link HystrixRequestVariable} which forwards to the wrapped
* {@link HystrixRequestVariableLifecycle}.
* <p>
* This implementation also returns null when {@link #get()} is called while the {@link HystrixRequestContext} has not
* been initialized rather than throwing an exception, allowing for use in a {@link HystrixConcurrencyStrategy} which
* does not depend on an a HystrixRequestContext
*/
public class HystrixLifecycleForwardingRequestVariable<T> extends HystrixRequestVariableDefault<T> {
private final HystrixRequestVariableLifecycle<T> lifecycle;
/**
* Creates a HystrixRequestVariable which will return data as provided by the {@link HystrixRequestVariableLifecycle}
* @param lifecycle lifecycle used to provide values. Must have the same type parameter as the constructed instance.
*/
public HystrixLifecycleForwardingRequestVariable(HystrixRequestVariableLifecycle<T> lifecycle) {
this.lifecycle = lifecycle;
}
/**
* Delegates to the wrapped {@link HystrixRequestVariableLifecycle}
* @return T with initial value or null if none.
*/
@Override
public T initialValue() {
return lifecycle.initialValue();
}
/**
* Delegates to the wrapped {@link HystrixRequestVariableLifecycle}
* @param value
* of request variable to allow cleanup activity.
* <p>
* If nothing needs to be cleaned up then nothing needs to be done in this method.
*/
@Override
public void shutdown(T value) {
lifecycle.shutdown(value);
}
/**
* Return null if the {@link HystrixRequestContext} has not been initialized for the current thread.
* <p>
* If {@link HystrixRequestContext} has been initialized then call method in superclass:
* {@link HystrixRequestVariableDefault#get()}
*/
@Override
public T get() {
if (!HystrixRequestContext.isCurrentThreadInitialized()) {
return null;
}
return super.get();
}
}
- 继承了HystrixRequestVariableDefault类,然后调用HystrixRequestVariableLifecycle来进行初始化和销毁
- HystrixRequestVariableDefault主要是对HystrixRequestContext进行操作
HystrixConcurrencyStrategyDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyDefault.java
代码语言:javascript复制/**
* Default implementation of {@link HystrixConcurrencyStrategy} using standard java.util.concurrent.* implementations.
*
* @ExcludeFromJavadoc
*/
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();
public static HystrixConcurrencyStrategy getInstance() {
return INSTANCE;
}
private HystrixConcurrencyStrategyDefault() {
}
}
默认实现没有重新任何方法,都是使用了父类的实现
小结
HystrixConcurrencyStrategy是提供给开发者去自定义hystrix内部线程池及其队列,还提供了包装callable的方法,以及传递上下文变量的方法。
doc
- Concurrency Strategy