导读
- CAS 原理、适应场景、如何避免 ABA 问题
- 基于 CAS 操作的原子类
环境及版本
- 运行版本 JDK 8
- JDK 源码版本:jdk8-b13
CAS
- CAS 全称 Compare and Swap,是 Java 中提供的一个原子操作,是一种高效且线程安全的并发编程技术。
流程
- CAS 需要提供三个参数:原始值、当前值、期望值,执行流程如下:
CAS 优缺点
优点
非阻塞、高效
- CAS 是一种非互斥的同步方式,当访问互斥变量时,不进行加锁,而是直接进行修改,修改完成后判断互斥变量有没有被其它线程修改,如果未被修改,则更新成功;若被其它线程修改,则自旋重试,避免了阻塞线程带来上下文切换开销,是一种高效保证线程安全的实现方式,适用于同步代码块执行较快,线程等待时间较短的场景。
缺点
自旋开销
- 但当同步代码块执行时间比较长,线程会进行大量无用的自旋,占用CPU资源,因此CAS不适用同步代码块执行时间比较长,线程等待时间较长的场景。
ABA问题
- CAS 有一个典型的问题就是ABA问题,即原始值最初为3,但是中间被其它线程修改多次,最后又变为了3,当进行比较时,我们程序认为没有被其它线程修改。ABA 问题对中间一定不能被其它线程操作的场景有影响,可以通过版本号的方式解决,因此,使用时需要注意是否需要规避ABA问题。
CAS 原理
- Java Unsafe 类提供了很多偏底层的方法,其中就包括 CAS 方法,例如:
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
// 通过 Object o, long offset 获取内存中的当前值、int expected 原始值、int x 修改值
- jdk8-b13 源码分析 compareAndSwapInt 方法
// hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 获取变量在内存中的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
// hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp 以 x86 为例
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
// 使用 Lock 实现 缓存行锁
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
CAS 在 Java 中的应用
- 在 JUC 中原子类都是基于 CAS 实现。
AtomicLong(存在 ABA 问题)
- incrementAndGet() 自增
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) 1L;
}
/**
* Atomically adds the given value to the current value of a field
* or array element within the given object <code>o</code>
* at the given <code>offset</code>.
*
* @param o object/array to update the field/element in
* @param offset field/element offset
* @param delta the value to add
* @return the previous value
* @since 1.8
*/
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v delta));
return v;
}
AtomicStampedReference(解决 ABA 问题)
代码语言:Java复制public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
// 增加 stamp 作为版本号
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
}
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
/**
* 1、比较值的引用 & stamp 值是否被其它线程改变
* 2、原始值等于最新值 直接返回(已经是最新值)
* 3、CAS 生成新的 pair 对象
**/
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
使用示例
代码语言:Java复制public class MainTest {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter(1000);
Thread thread = new Thread(() -> {
for (int i = 0; i < 10000; i ) {
counter.increment();
}
});
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10000; i ) {
counter.increment();
}
});
thread.start();
thread1.start();
thread.join();
thread1.join();
System.out.println("final value: " counter.getValue());
System.out.println("final stamp: " counter.getStamp());
}
}
class Counter {
private final AtomicStampedReference<Integer> value;
public Counter(int initialValue) {
value = new AtomicStampedReference<>(initialValue, 0);
}
public int getValue() {
return value.getReference();
}
public int getStamp() {
return value.getStamp();
}
public int increment() {
int[] stampHolder = new int[1];
// 注意 compareAndSet 里比较的是 value 的引用 需要使用包装类接收 否则装箱拆箱会导致原始引用丢失
Integer current;
int next;
do {
current = value.get(stampHolder);
next = current 1;
} while (!value.compareAndSet(current, next, stampHolder[0], stampHolder[0] 1));
return next;
}
}
AtomicMarkableReference(不解决 ABA 问题)
- AtomicMarkableReference 只是使用 Boolean 标记数据是否被改变,适用于一些监测数据是否改变的场景,如下是一个简单场景。
class CounterAtomicMarkable {
private final AtomicMarkableReference<Integer> value;
public CounterAtomicMarkable(int initialValue) {
value = new AtomicMarkableReference<>(initialValue, false);
}
public int decrease() {
// 检测当前数据 是否被操作
// 比如 有一个账户 账户状态作为 boolean 值 当账户状态正常时可以入账出账 非正常时不可出账入账
// 一般方案: 判断和操作需要加锁进行同步
// 关停账户 value.attemptMark(value.getReference(), true);
if (value.isMarked()) {
return -1;
} else {
Integer current;
int next;
do {
if (value.isMarked()) {
return -1;
}
current = value.getReference();
next = current - 1;
} while (!value.compareAndSet(current, next, false, false));
return next;
}
}
}
个人简介