java.lang.ThreadLocal变量信息如何多线程传递,避免信息丢失

2023-06-19 15:18:51 浏览数 (2)


java.lang.ThreadLocal变量在多线程环境下,如何避免信息丢失传递


之前公众号博文也介绍了java.lang.ThreadLocal变量在多线程环境下会丢失信息传递。

java.lang.ThreadLocal变量在多线程环境下会丢失信息传递。

避免ThreadLocal变量的传递丢失,我们如何做?

如何避免ThreadLocal变量传递丢失


实现自己的Runnable、Callable,完成信息传递。

以hystrix源码为例讲解一下。

1、定义ThreadLocal变量承载容器

代码语言:javascript复制
public class HystrixRequestContext implements Closeable {

    /*
     * ThreadLocal on each thread will hold the HystrixRequestVariableState.
     * 
     * Shutdown will clear the state inside HystrixRequestContext but not nullify the ThreadLocal on all
     * child threads as these threads will not be known by the parent when cleanupAfterRequest() is called.
     * 
     * However, the only thing held by those child threads until they are re-used and re-initialized is an empty
     * HystrixRequestContext object with the ConcurrentHashMap within it nulled out since once it is nullified
     * from the parent thread it is shared across all child threads.
     */
    private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();

    public static boolean isCurrentThreadInitialized() {
        HystrixRequestContext context = requestVariables.get();
        return context != null && context.state != null;
    }

    public static HystrixRequestContext getContextForCurrentThread() {
        HystrixRequestContext context = requestVariables.get();
        if (context != null && context.state != null) {
            // context.state can be null when context is not null
            // if a thread is being re-used and held a context previously, the context was shut down
            // but the thread was not cleared
            return context;
        } else {
            return null;
        }
    }

    public static void setContextOnCurrentThread(HystrixRequestContext state) {
        requestVariables.set(state);
    }

    /**
     * Call this at the beginning of each request (from parent thread)
     * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from
     * the parent thread.
     * <p>
     * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b>
     * <p>
     * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context.
     */
    public static HystrixRequestContext initializeContext() {
        HystrixRequestContext state = new HystrixRequestContext();
        requestVariables.set(state);
        return state;
    }

    /*
     * This ConcurrentHashMap should not be made publicly accessible. It is the state of RequestVariables for a given RequestContext.
     * 
     * Only HystrixRequestVariable has a reason to be accessing this field.
     */
    /* package */ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();

    // instantiation should occur via static factory methods.
    private HystrixRequestContext() {

    }

    /**
     * Shutdown {@link HystrixRequestVariableDefault} objects in this context.
     * <p>
     * <b>NOTE: This must be called if <code>initializeContext()</code> was called or a memory leak will occur.</b>
     */
    public void shutdown() {
        if (state != null) {
            for (HystrixRequestVariableDefault<?> v : state.keySet()) {
                // for each RequestVariable we call 'remove' which performs the shutdown logic
                try {
                    HystrixRequestVariableDefault.remove(this, v);
                } catch (Throwable t) {
                    HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
                }
            }
            // null out so it can be garbage collected even if the containing object is still
            // being held in ThreadLocals on threads that weren't cleaned up
            state = null;
        }
    }

    /**
     * Shutdown {@link HystrixRequestVariableDefault} objects in this context.
     * <p>
     * <b>NOTE: This must be called if <code>initializeContext()</code> was called or a memory leak will occur.</b>
     *
     * This method invokes <code>shutdown()</code>
     */
    public void close() {
      shutdown();
    }

}

HystrixRequestContext包含了一个ThreadLocal变量requestVariables,一个变量

ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state。

HystrixRequestContext承载了线程绑定的变量。在多线程环境下我们必须透传HystrixRequestContext,才能保证其中的

ThreadLocal变量requestVariables变量不会传递丢失。

2、实现自己的Runnable,管理HystrixRequestContext的生命周期

代码语言:javascript复制
/**
 * Wrapper around {@link Runnable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Runnable}
 * 
 * @ExcludeFromJavadoc
 */
public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
    
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    }

    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }

        });
        this.parentThreadState = hystrixRequestContext;
    }

    @Override
    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

}

其思想是:构造我们自己的Runnable实例,构造函数传递要透传的threadlocal变量,称之为父线程的变量parentThreadState,因为此时子线程还没运行。

当子线程执行时,我们要保存设置及恢复父线程变量

代码语言:javascript复制
 public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

代码第5行,把父线程变量传递到当前子线程变量中,代码14行,子线程执行完毕,我们要恢复上下文环境

3、实现自己的Callable,管理HystrixRequestContext的生命周期

其代码思路与实现自己的Runnable,思路相同。

代码语言:javascript复制
/**
 * Wrapper around {@link Callable} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Callable}
 * 
 * @param <K>
 *            Return type of {@link Callable}
 * 
 * @ExcludeFromJavadoc
 */
public class HystrixContextCallable<K> implements Callable<K> {

    private final Callable<K> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextCallable(Callable<K> actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }

    public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
        this.actual = concurrencyStrategy.wrapCallable(actual);
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }

    @Override
    public K call() throws Exception {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            return actual.call();
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

}

4、留出其它线程变量传递扩展

代码语言:javascript复制
 public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
        this.actual = concurrencyStrategy.wrapCallable(actual);
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }

实现自己的Runnable、Callable,完成信息传递时,我们发现构造函数都传递了一个HystrixConcurrencyStrategy,其中的方法:

代码语言:javascript复制
public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

为我们传递其它thread local变量打开了方便之门。其链路信息的传递是由HystrixConcurrencyStrategy来实现的。

小结


避免线程本地变量传递丢失,我们需要实现自己的Runnable、Callable,来完成父线程本地变量的设置与恢复。


0 人点赞