java.lang.ThreadLocal变量在多线程环境下,如何避免信息丢失传递
之前公众号博文也介绍了java.lang.ThreadLocal变量在多线程环境下会丢失信息传递。
java.lang.ThreadLocal变量在多线程环境下会丢失信息传递。
避免ThreadLocal变量的传递丢失,我们如何做?
如何避免ThreadLocal变量传递丢失
实现自己的Runnable、Callable,完成信息传递。
以hystrix源码为例讲解一下。
1、定义ThreadLocal变量承载容器
代码语言:javascript复制public class HystrixRequestContext implements Closeable {
/*
* ThreadLocal on each thread will hold the HystrixRequestVariableState.
*
* Shutdown will clear the state inside HystrixRequestContext but not nullify the ThreadLocal on all
* child threads as these threads will not be known by the parent when cleanupAfterRequest() is called.
*
* However, the only thing held by those child threads until they are re-used and re-initialized is an empty
* HystrixRequestContext object with the ConcurrentHashMap within it nulled out since once it is nullified
* from the parent thread it is shared across all child threads.
*/
private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();
public static boolean isCurrentThreadInitialized() {
HystrixRequestContext context = requestVariables.get();
return context != null && context.state != null;
}
public static HystrixRequestContext getContextForCurrentThread() {
HystrixRequestContext context = requestVariables.get();
if (context != null && context.state != null) {
// context.state can be null when context is not null
// if a thread is being re-used and held a context previously, the context was shut down
// but the thread was not cleared
return context;
} else {
return null;
}
}
public static void setContextOnCurrentThread(HystrixRequestContext state) {
requestVariables.set(state);
}
/**
* Call this at the beginning of each request (from parent thread)
* to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from
* the parent thread.
* <p>
* <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b>
* <p>
* See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context.
*/
public static HystrixRequestContext initializeContext() {
HystrixRequestContext state = new HystrixRequestContext();
requestVariables.set(state);
return state;
}
/*
* This ConcurrentHashMap should not be made publicly accessible. It is the state of RequestVariables for a given RequestContext.
*
* Only HystrixRequestVariable has a reason to be accessing this field.
*/
/* package */ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();
// instantiation should occur via static factory methods.
private HystrixRequestContext() {
}
/**
* Shutdown {@link HystrixRequestVariableDefault} objects in this context.
* <p>
* <b>NOTE: This must be called if <code>initializeContext()</code> was called or a memory leak will occur.</b>
*/
public void shutdown() {
if (state != null) {
for (HystrixRequestVariableDefault<?> v : state.keySet()) {
// for each RequestVariable we call 'remove' which performs the shutdown logic
try {
HystrixRequestVariableDefault.remove(this, v);
} catch (Throwable t) {
HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
}
}
// null out so it can be garbage collected even if the containing object is still
// being held in ThreadLocals on threads that weren't cleaned up
state = null;
}
}
/**
* Shutdown {@link HystrixRequestVariableDefault} objects in this context.
* <p>
* <b>NOTE: This must be called if <code>initializeContext()</code> was called or a memory leak will occur.</b>
*
* This method invokes <code>shutdown()</code>
*/
public void close() {
shutdown();
}
}
HystrixRequestContext包含了一个ThreadLocal变量requestVariables,一个变量
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state。
HystrixRequestContext承载了线程绑定的变量。在多线程环境下我们必须透传HystrixRequestContext,才能保证其中的
ThreadLocal变量requestVariables变量不会传递丢失。
2、实现自己的Runnable,管理HystrixRequestContext的生命周期
代码语言:javascript复制/**
* Wrapper around {@link Runnable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Runnable}
*
* @ExcludeFromJavadoc
*/
public class HystrixContextRunnable implements Runnable {
private final Callable<Void> actual;
private final HystrixRequestContext parentThreadState;
public HystrixContextRunnable(Runnable actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
}
public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
actual.run();
return null;
}
});
this.parentThreadState = hystrixRequestContext;
}
@Override
public void run() {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
}
其思想是:构造我们自己的Runnable实例,构造函数传递要透传的threadlocal变量,称之为父线程的变量parentThreadState,因为此时子线程还没运行。
当子线程执行时,我们要保存设置及恢复父线程变量。
代码语言:javascript复制 public void run() {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
代码第5行,把父线程变量传递到当前子线程变量中,代码14行,子线程执行完毕,我们要恢复上下文环境。
3、实现自己的Callable,管理HystrixRequestContext的生命周期
其代码思路与实现自己的Runnable,思路相同。
代码语言:javascript复制/**
* Wrapper around {@link Callable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Callable}
*
* @param <K>
* Return type of {@link Callable}
*
* @ExcludeFromJavadoc
*/
public class HystrixContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
private final HystrixRequestContext parentThreadState;
public HystrixContextCallable(Callable<K> actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}
public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
this.actual = concurrencyStrategy.wrapCallable(actual);
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}
@Override
public K call() throws Exception {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
return actual.call();
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
}
4、留出其它线程变量传递扩展
代码语言:javascript复制 public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
this.actual = concurrencyStrategy.wrapCallable(actual);
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}
实现自己的Runnable、Callable,完成信息传递时,我们发现构造函数都传递了一个HystrixConcurrencyStrategy,其中的方法:
代码语言:javascript复制public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
为我们传递其它thread local变量打开了方便之门。其链路信息的传递是由HystrixConcurrencyStrategy来实现的。
小结
避免线程本地变量传递丢失,我们需要实现自己的Runnable、Callable,来完成父线程本地变量的设置与恢复。