并发基础之原子操作与原子变量

2019-07-30 16:04:30 浏览数 (1)

题外话:最近忙于产品,公众号好久没有更新了,等忙过了这段时间再继续分析go的runtime代码及其它一些优秀的源代码,所以在此先把几年前发于知乎专栏的一篇文章(有部分修改)搬到公众号,这篇文章虽然是以java/c为例对原子操作及原子变量进行的说明,但万变不离其宗, 编程领域中的很多知识都是相通的,所以这里介绍的概念以及技术细节很容易迁移到其它语言之中。


在说明原子操作之前,我们先看下面这段Java代码:

代码语言:javascript复制
public class AtomicTest {
    private static final int COUNT_TIMES = 10000 * 10000;
    private static volatile int counter = 0;

    public static void main(String[] args) {
        Runnable r = () -> {
            for (int i = 0; i < COUNT_TIMES; i  ) {
                counter  ;
            }
        };

        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);

        t1.start();
        t2.start();

        try {
            t1.join();
            t2.join();

            System.out.printf("counter = %dn", counter);

         } catch (InterruptedException e) {
            e.printStackTrace();
         }
     }
}

不懂java也没关系,上面这段代码做的事情很简单,开了2个线程对同一个共享整型变量分别执行一亿次加1操作,我们期望最后打印出来counter的值为200000000(2亿),但事与愿违,运行上面的代码,counter的值是极有可能不等于2亿的,而且每次运行结果都可能不一样,但总是小于2亿。为什么会出现这个情况呢?从Java内存模型的角度来看,简单的counter 的执行过程其实分为如下三步:

  1. 从主内存中加载counter的值到线程工作内存
  2. 执行加1运算
  3. 把第二步的执行结果从工作内存写入到主内存

那么现在假设主内存中counter的值是100,两个线程现在都同时执行counter ,则可能出现如下情况:

代码语言:javascript复制
线程 1 从主内存中加载counter的值100到线程 1 到工作内存
线程 2 从主内存中加载counter的值100到线程 2 到工作内存
线程 1 执行加1运算得到结果101
线程 2 执行加1运算得到结果101
线程 1 把101写入主内存中的counter变量
线程 2 把101写入主内存中的counter变量

线程1和2都执行了 1运算,本来我们期望得到102,但却错误的得到了101这个值。

从上面这个引起错误的流程可以看出,之所以结果错误,其本质是两个线程同时操作了同一块内存,线程1执行 运算的过程中插入了线程2的 操作,也就是说从另外一个线程的角度看 操作并不是一个原子操作。

现在我们已经知道多线程并发执行counter 其结果不正确的原因了,但怎么解决这个问题呢?

既然错误是因为 不是一个原子操作而导致的,那么我们想办法使其成为原子操作就可以了,因此我们可以:

  1. 加锁;
  2. 使用原子变量。

来解决上述问题。

首先来看加锁,伪码如下:

代码语言:javascript复制
for (int i = 0; i < COUNT_TIMES; i  ) {
        lock();
        counter  ;
        unlock();
}

这里我们在执行counter 这行代码前后进行了加锁/解锁操作,用来防止其它线程同时执行它,这样就可以保证结果的正确性。虽然这个方法可以解决问题,但大家可以自己试一下,你会发现加锁之后性能急剧下降,主要原因是锁冲突会导致线程上下文切换,这种切换开销很大。

下面我们来试试使用原子变量。

C语言中可以使用gcc提供的原子操作函数,Java中可以使用Atomic相关类,如下面的Java代码:

代码语言:javascript复制
public class AtomicTest {
private static final int COUNT_TIMES = 10000 * 10000;
    private static volatile int counter = 0;
    private static volatile AtomicInteger atomCounter = new AtomicInteger(0); //Java提供的int型原子变量

    public static void main(String[] args) {
        Runnable r = () -> {
           for (int i = 0; i < COUNT_TIMES; i  ) {
                atomCounter.addAndGet(1); //原子变量的加法
             }
         };

        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);

         t1.start();;
         t2.start();

         try {
             t1.join();
             t2.join();

             System.out.printf("atomCounter = %dn", atomCounter.get());

         } catch (InterruptedException e) {
             e.printStackTrace();
          }
     }
}

代码中使用了AtomicInteger类的addAndGet()方法,这个方法执行加法操作时是原子的,所以不需要我们在代码中加锁。如果我们运行这段代码,会发现它比前面提到的加锁方法效率高很多,加锁方法执行1亿次加法所用时间是使用原子变量的好几倍。为什么使用原子变量效率会高出这么多呢?要想找到答案,就得分析原子变量提供的原子操作是怎么实现的。

下面我们首先来看Java中的实现,然后分析gcc的实现。

Java中的原子类实现在java.util.concurrent.atomic包中,找到AtomicInteger类,为了减小篇幅,这里只保留类的很小一部分来说明问题

代码语言:javascript复制
public class AtomicInteger extends Number implements java.io.Serializable {

    private volatile int value;

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
     }

    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final int get() {
        return value;
     }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta)   delta;
     }
}

可以看到addAndGet方法使用了unsafe包的getAndAddInt方法:

代码语言:javascript复制
/**
     * Atomically adds the given value to the current value of a field
     * or array element within the given object <code>o</code>
     * at the given <code>offset</code>.
     *
     * @param o object/array to update the field/element in
     * @param offset field/element offset
     * @param delta the value to add
     * @return the previous value
     * @since 1.8
     */
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
         } while (!compareAndSwapInt(o, offset, v, v   delta));
         return v;
    }

/**
     * Atomically update Java variable to <tt>x</tt> if it is currently
     * holding <tt>expected</tt>.
     * @return <tt>true</tt> if successful
     */
    public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);

可以看到getAndAddInt函数在一个循环中反复调用了 compareAndSwapInt 方法,该方法是一个本地方法,我们需要到虚拟机的目录中去找这个方法的C 实现:

代码语言:javascript复制
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

该函数的核心是lock cmpxchg汇编指令,lock前缀用于锁总线,它可以保证其后的指令执行时的原子性以及实现内存屏障功能,而cmpxchg指令实现的是CAS(compare and swap/set)操作。下面我们用伪码来把前面讨论的东西串在一起说明一下如何保证i 这种操作的原子性:

代码语言:javascript复制
int i = 0;
for (;;) {
    v = i;
    //lock cmpxchg指令的的逻辑  ---START---
    lock总线
    if (i == v) {
        i = i   1;
        break;
    }
    unlock总线
    //lock cmpxchg指令的的逻辑  ---END---
}

可以清楚的看到原子变量的原子操作其实也使用了锁,只不过这个是硬件指令级别的锁,比我们软件实现的锁高效得多,更重要的是从上面的伪码可以看出,如果出现了冲突,只是不停的循环重试,而不会切换线程。

我们再来看一下gcc是怎么实现的原子操作。

高版本的gcc提供了一系列原子操作函数,比如__sync_fetch_and_add函数实现了原子的从内存中读取一个值,然后执行加法操作,最后把结果写入内存。看个例子:

代码语言:javascript复制
#include <stdio.h>

int main(int argc, char *argv[])
{
   int a = 100;
    int b = __sync_fetch_and_add(&a, 2);
    printf("a = %d, b = %dn", a, b);
  
    return 0;
}

这段代码最后会打印出 a = 102, b = 100. 来看一下gcc怎么实现的这个函数,用gdb直接反汇编:

< 33>行的lock xadd �x, 0x4(%rsp) 指令直接把ecx寄存器中的值(常量2)加到了一个内存中的值(变量a)之上,看起来好像只有1次内存访问,但事实上这条指令需要进行了2次内存访问:首先从内存中读取a的值,然后求和并把求和结果存入变量a之中,即:

  1. 从内存读取变量a的值到寄存器
  2. 与2相加
  3. 把相加后的结果存入变量a对应的内存

这明明是三步操作为什么能够保证原子操作呢,答案就在于xadd这个指令,cpu执行这个指令之前首先会把这条指令之前的读写内存操作完成,然后锁住内存总线直到执行完上面的三步操作之后才释放总线,在这段时间之内,其它cpu是无法访问内存的,这就保证了加法操作的原子性(另外xadd指令还有内存屏障功能),但这种保证需要建立在多个线程之间的相互协作基础之上,也就是说只要多个线程都按照规定使用xadd并发的操作同一变量,就可以保证多个线程安全的并发执行这种加法操作,但如果只在部分线程中使用__sync_fetch_and_add执行对共享变量的加法操作,而在另外的线程中却直接使用 a 这样的操作的话还是会有问题的。

最后简单的总结一下Java以及gcc对原子变量的实现:Java中用的是循环使用CAS操作实现的原子变量的原子操作,而gcc使用的是xadd指令,可以看出gcc的实现方式更加简洁,应该也更高效,另外,go语言中同样也是使用的xadd指令实现的对整型变量的原子操作,有兴趣的读者可以去看一下相关代码。

0 人点赞