ThreadLocal案例分析

2022-05-12 14:21:47 浏览数 (1)

目录

代码语言:javascript复制
1. ThreadLocal简介
    1.1 ThreadLocal基础
        1.1.1 ThreadLocal和Thread的关系
        1.1.2 变量的生命周期
    1.2 可继承的ThreadLocal
2. ThreadLocal的应用案例
    2.1 解决并发问题
        2.1.1 java.lang.ThreadLocalRandom
        2.1.2 HDFS中的Statistics:实现高并发下的统计功能
    2.2 解决数据存储问题
        2.2.1 Struts2的ActionContext设计原理
        2.2.2 Spring中thread scope Bean
3. 总结

1. ThreadLocal简介

这篇博客主要对ThreadLocal类的基础知识和实践应用进行分析。文章的重点在于应用案例的探究,同时也会对理论基础作简单的介绍。

1.1 ThreadLocal基础

为什么需要ThreadLocal

要理解为什么需要ThreadLocal就不得不从线程安全问题说起。高并发是很多领域都会遇到的非常棘手的问题,其最核心的问题在于如何平衡高性能和数据一致性。当我们说某个类是线程安全的时候,也就意味着该类在多线程环境下的状态保持一致性。

所谓的一致性,就是关联数据之间的逻辑关系是否正确和完整。

通过下面示例对数据一致性问题进行说明:

代码语言:javascript复制
public class ThreadLocalDemo {

    public static void main(String[] args) throws InterruptedException {
        int nThreads = 10;
        final Counter counter = new Counter();
        
        ExecutorService exec = Executors.newFixedThreadPool(nThreads);
        final CountDownLatch latch = new CountDownLatch(nThreads);
        
        for(int i = 0; i < nThreads; i  ){
            exec.submit(new Runnable(){
                public void run(){
                    for(int i = 0; i < 10000; i  ){
                        counter.increase();
                    }
                    latch.countDown();
                }
            });
        }
        latch.await();
        System.out.println("Expected:"   nThreads * 10000   ",Actual:"   counter.count);
    }
    static class Counter{
        int count = 0;
        
        public void  increase(){
            this.count  ;
        }
    }

输出:

Expected:100000,Actual:71851

可见最终变量count的状态并不符合预期的逻辑。对于并发问题来说,最简单的解决办法就是加锁,本质是并发访问串行访问的改变。如下:

代码语言:javascript复制
    static class Counter{
        int count = 0;
        
        public synchronized void  increase(){
            this.count  ;
        }       
    }

输出:

Expected:100000,Actual:100000

第一次实验中,count变量的值之所以出现不正确的情况,是因为其被多个线程同时访问,而且对某个线程来说,其它线程对变量count的操作结果,该线程是不一定可见的,这是造成count变量最终数据不一致的原因。而用synchronized修饰过后,串行访问时就不存在不可见的情况。从而保证了count变量的正确性。那么是否可以换个思路:让变量只能被一个线程访问,这不就不存在之前谈到的线程安全问题了吗?

让每个线程都保存一份变量的副本,该副本只会被隶属的线程操作,这也就不存在线程安全问题了。这就是ThreadLocal的由来。

1.1.1 ThreadLocal和Thread的联系

在上面提到了数据副本,那么线程如何保存该副本的呢?其实,Thread类中有一个ThreadLocalMap类型的变量threadLocals,定义如下:

代码语言:javascript复制
public class Thread implements Runnable {
    
    //。。。
    
    ThreadLocal.ThreadLocalMap threadLocals = null;
    
    //。。。    
}

ThreadLocalMap是ThreadLocal的一个内部类,其作用相当于一个HashMap,用于保存隶属于该线程的变量副本。下面需要考虑一个问题:ThreadLocalMap的key和value该如何设计呢?

从API角度来说,ThreadLocal的作用是提供给client访问Thread中threadLocals变量的访问接口,每个ThreadLocal都对应着一个Thread内部的变量副本。所以ThreadLocalMap中的key就是ThreadLocal对象(也就是该对象的hashCode),value也就是变量副本。一个对象默认的hashcode也就是该对象的引用值,这可以保证不同对象的hashcode不同。不过ThreadLocal并没有使用这一默认值,而是内部声明了一个threadLocalHashCode整型变量用以存储该对象的hashcode值:

代码语言:javascript复制
public class ThreadLocal<T> {

    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode =
        new AtomicInteger();

    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    //。。。
}

变量副本的存储问题已经解决,那么怎么对Thread内部的threadLocals变量进行访问呢?这就要通过ThreadLocal了。下面对ThreadLocal的方法简单介绍下:

  1. get()操作
代码语言:javascript复制
    public T get() {
        Thread t = Thread.currentThread();//获取当前Thread对象引用
        ThreadLocalMap map = getMap(t);//从Thread对象中获取ThreadLocalMap变量
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null)
                return (T)e.value;
        }
        return setInitialValue();//如果是第一次访问,就setInitialValue进行初始化
    }
    
    private T setInitialValue() {
        //initialValue方法是protected修饰的,默认返回null,所以需要在ThreadLocal子类中进行覆盖。
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }    
  1. set操作
代码语言:javascript复制
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

和setInitialValue几乎一致,不同的是:set操作会传入需要设置的value。而setInitialValue需要通过initialValue()获取初始值。

  1. remove操作
代码语言:javascript复制
     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }
1.1.2 变量的生命周期

这里所的变量指的是存储在Thread对象中的变量副本。下面从init-service-destroy三个阶段分析下其生命周期:

  1. Init 第一次调用get方法的时候完成了初始化过程。这也就是为什么需要覆盖ThreadLocal的initialValue方法。在setInitialValue方法中的createMap方法如下:
代码语言:javascript复制
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
  1. Service 只要线程活着且ThreadLocal可访问即处于Service阶段。
  2. Destroy 由于threadLocals变量是Thread的成员,那么当Thread对象挂了后,那么其内部的所有成员也都被gc了。此外,通过ThreadLocal提供remove方法也可以将threadLocals里的特定副本变量移除。

ThreadLocal变量的生命周期呢?由于ThreadLocal变量通常用private static修饰,也就是属于类成员 变量。所以其生命周期当然也就和该类一致。

1.2 可继承的ThreadLocal

首先看个实例:

代码语言:javascript复制
    static class Context {

        private static final ThreadLocal<HashMap<String,String>> CONTEXT1 = new ThreadLocal<HashMap<String,String>>(){
            protected HashMap<String,String> initialValue(){
                return new HashMap<String,String>();
            }
        };
        private static final InheritableThreadLocal<HashMap<String,String>> CONTEXT2 = new InheritableThreadLocal<HashMap<String,String>>(){
            protected HashMap<String,String> initialValue(){
                return new HashMap<String,String>();
            }
        };      
        public static HashMap<String,String> getContext1() {
            return CONTEXT1.get();
        }
        public static HashMap<String,String> getContext2() {
            return CONTEXT2.get();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Context.getContext1().put("name", "wqx");
        Context.getContext2().put("name", "wqx");
        Thread thread = new Thread(new Runnable(){
            @Override
            public void run() {
                System.out.println("name:"   Context.getContext1().get("name"));
                System.out.println("name:"   Context.getContext2().get("name"));
            }
        });
        thread.start();
    }

输出:

name:null

name:wqx

字面意思上理解InheritableThreadLocal即为可继承的ThreadLocal,这里的可继承的含义指的是子线程在实例化过程中,会查看当前执行线程(可以理解为父线程)的inheritableThreadLocals是否为null,如果不为null,则将该变量赋值给子线程的inheritableThreadLocals。下面是Thread类构造函数中的相关片段:

代码语言:javascript复制
    Thread parent = currentThread();//当前线程,也就是执行new Thread()的线程
    if (parent.inheritableThreadLocals != null)
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

2. ThreadLocal的应用案例

2.1 解决并发问题

2.1.1 java.lang.ThreadLocalRandom

在Java中随机数可以用Random类,下面是java.util.Random的生成随机数的方法:

代码语言:javascript复制
    protected int next(int bits) {
        long oldseed, nextseed;
        AtomicLong seed = this.seed;
        do {
            oldseed = seed.get();
            nextseed = (oldseed * multiplier   addend) & mask;
        } while (!seed.compareAndSet(oldseed, nextseed));
        return (int)(nextseed >>> (48 - bits));
    }

可见,其中通过CAS方式保证其线程安全性。这在高并发的环境中由于线程间的竞争必然带来一定的性能损耗。ThreadLocal此时就派上用场了,ThreadLocalRandom是通过ThreadLocal改进的用于随机数生成的工具类,每个线程单独持有一个ThreadLocalRandom对象引用,这就完全杜绝了线程间的竞争问题。

代码语言:javascript复制
public class ThreadLocalRandom extends Random {
    
    //。。。
    
    private static final ThreadLocal<ThreadLocalRandom> localRandom =
        new ThreadLocal<ThreadLocalRandom>() {
            protected ThreadLocalRandom initialValue() {
                return new ThreadLocalRandom();
            }
    };
    //。。。
}

ThreadLocalRandom能用于全局范围的随机数生成吗?每个线程都持有一个ThreadLocalRandom对象,生成的随机数不会重复吗?考虑到ThreadLocal的特点,理论上也就不应该将其用于全局范围,其更适合于线程独享变量的存储。But!凡事都有例外,下面看个例外的用法。

2.1.2 HDFS中的Statistics:实现高并发下的统计功能

Hadoop的分布式文件系统(HDFS)是其生态的基石,MR任务中涉及到的数据输入输出都与其密切相关。对于FileSystem来说,对大量的读写操作进行统计是非常必要的。这该如何实现呢?

方案一:通过加锁的方式。考虑到Hadoop处理的数据体量及对数据操作的频率,加锁带来的性能损耗不可忽视,So。。。PASS!

方案二:ThreadLocal可以吗?对当前FileSystem进行操作的线程很多,如果只使用ThreadLocal方案的话,只能统计一个线程的操作次数,那么在汇总操作的时候必然要进行同步synchronized处理。这可行吗?判断一个方案可不可行,必须要具体业务逻辑具体分析,在本例中,statistics是用于存储统计数据的对象,那么对FileSystem进行操作(比如:create、mkdir、list、delete等)的同时都会记录在statistics对象中,也就是对statistics对象进行写操作,而对于统计数据的读操作比较少。所以Hadoop考虑到写多读少的事实,ThreadLocal方案是可以接受的。

下面是Statistics对象的部分实现:

代码语言:javascript复制
public static final class Statistics {

    /**
     * Statistics data.
     * /
    public static class StatisticsData {
      volatile long bytesRead;
      volatile long bytesWritten;
      volatile int readOps;
      volatile int largeReadOps;
      volatile int writeOps;
      //。。。
    }

    //allData保存的是所有线程中StatisticsData对象的引用
    private final Set<StatisticsDataReference> allData;
    
    //ThreadLocal变量
    private final ThreadLocal<StatisticsData> threadData;
    
    public void incrementBytesWritten(long newBytes) {
      getThreadStatistics().bytesWritten  = newBytes;
    }
    
    public StatisticsData getThreadStatistics() {
      StatisticsData data = threadData.get();
      if (data == null) {   //第一次统计操作时需要进行初始化,并与allData进行关联
        data = new StatisticsData();
        threadData.set(data);
        StatisticsDataReference ref =
            new StatisticsDataReference(data, Thread.currentThread());
        synchronized(this) {
          allData.add(ref);
        }
      }
      return data;
    }    
    //。。。
}

下面是DistributedFileSystem中删除操作的实现,可见在每次执行删除操作的时候,都会通过statistics进行记录。

代码语言:javascript复制
public class DistributedFileSystem extends FileSystem {

  @Override
  public boolean delete(Path f, final boolean recursive) throws IOException {
    statistics.incrementWriteOps(1);
    // 。。。
  }
}

如果需要获取统计数据时,就要将所有线程内部的统计数据进行累加,这肯定需要进行同步处理的。如下所示的是获取统计数据中所有写操作的次数:

代码语言:javascript复制
    public long getBytesWritten() {
      return visitAll(new StatisticsAggregator<Long>() {
        private long bytesWritten = 0;

        @Override
        public void accept(StatisticsData data) {
          bytesWritten  = data.bytesWritten;
        }

        public Long aggregate() {
          return bytesWritten;
        }
      });
    }
    //加锁处理,保证统计数据的正确性
    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
      visitor.accept(rootData);
      for (StatisticsDataReference ref: allData) {
        StatisticsData data = ref.getData();
        visitor.accept(data);
      }
      return visitor.aggregate();
    }

在写多读少的环境下,这种方案可以有效的解决传统“加锁”方案带来的多线程间的竞争。Brilliant idea!

2.2 解决数据存储问题

2.2.1 Struts2的ActionContext设计原理

Struts2是使用较为广泛的MVC框架,其关于请求响应流程的设计思路也是很新颖的。当第一次接触Struts2的时候,曾一直困惑于一个问题:Action中的每个方法的请求参数怎么获得的?处理结果又是如何返回的?在传统的Servlet中,我们可以通过函数入参HttpServletRequest对象获取请求参数,可以通过入参HttpServletResponse对象向输出流写入响应数据。而Struts2中自定义的Action的每个方法都没有入参,且处理后的响应数据也不是当作返回值返回的。

Struts2的最大亮点也许就是对数据流和控制流的解耦。数据不再需要作为方法参数传入或作为返回值返回。Struts2的返回值仅仅作为控制流的标识(比如:选择哪个视图)。Struts2中数据载体就是ActionContext。不管是请求参数亦或是处理后的响应数据都被封装在ActionContext内部。开发者一般常接触的是ActionContext的子类ServletActionContext。

首先看下Struts2中几个主要组件的示意图:

Struts2组件示意图

ActionContext作为数据载体,与每个组件都会有数据交互,如:ActionInvocation、Interceptor、Action、Result等。这几乎涵盖了一个请求的整个生命周期。这里说的请求的生命周期可以泛指处理请求的线程的生命周期。ThreadLocal不正适合这种情况吗?下面看下com.opensymphony.xwork2.ActionContext类的部分结构:

代码语言:javascript复制
public class ActionContext implements Serializable {

    //。。。
    
    static ThreadLocal<ActionContext> actionContext = new ThreadLocal<ActionContext>();
    
    private Map<String, Object> context;

    public ActionContext(Map<String, Object> context) {
        this.context = context;
    }
    
    public static ActionContext getContext() {
        return actionContext.get();
    }
    public Map<String, Object> getContextMap() {
        return context;
    }
    //。。。

ActionContext是典型的ThreadLocal使用案例,通过将请求处理过程中涉及到的所有参数封装进ActionContext中,从而实现了数据流和控制流的分离,这一解耦思路值得好好学习。Another brilliant idea!

2.2.1 Spring中thread scope Bean

在Spring中,如果按照Bean的生命周期对其进行划分,那么大致可以分为这么几类:Singleton、Prototype、Request、Session、Thread Scope等。这一节主要介绍ThreadScope的Bean如何实现。经过上面的各种案例分析,这个问题就灰常容 易解决了,只需要将Bean的生命周期与Thread同步就行。ThreadLocal正合适。下面是Spring内部已经实现的方案SimpleThreadScope:

代码语言:javascript复制
public class SimpleThreadScope implements Scope {

    private final ThreadLocal<Map<String, Object>> threadScope =
            new NamedThreadLocal<Map<String, Object>>("SimpleThreadScope") {
                @Override
                protected Map<String, Object> initialValue() {
                    return new HashMap<String, Object>();
                }
            };

    @Override
    public Object get(String name, ObjectFactory<?> objectFactory) {
        Map<String, Object> scope = this.threadScope.get();
        Object object = scope.get(name);
        if (object == null) {
            object = objectFactory.getObject();
            scope.put(name, object);
        }
        return object;
    }

    //。。。
}

3. 总结

上面小节中分别分析了ThreadLocal的两个主要的应用领域:

1.解决并发问题。

2.解决数据存储问题。

其中解决并发问题的本质是一种以空间换时间的思路,时间效率提升了,但是也存在着内存使用时的潜在溢出风险。数据存储问题主要指的是:系统中多个组件如何实现数据的交互和共享,而作为执行者的线程作为数据载体再适合不过了。虽然各种组件可以实现数据共享,但是数据在线程间是隔离的。

0 人点赞