本篇介绍
本篇看下Linux如何实现线程安全问题
原子操作
对于基础类型操作,使用原子变量就可以做到线程安全,那原子操作是如何保证线程安全的呢?linux中的原子变量如下:
代码语言:javascript复制typedef struct {
int counter;
} atomic_t;
#define ATOMIC_INIT(i) { (i) }
#ifdef CONFIG_64BIT
typedef struct {
s64 counter;
} atomic64_t;
#endif
那如何保证原子性的呢?看下实现
代码语言:javascript复制#define ATOMIC_OP(op, c_op)
static inline void generic_atomic_##op(int i, atomic_t *v)
{
int c, old;
c = v->counter;
while ((old = arch_cmpxchg(&v->counter, c, c c_op i)) != c)
c = old;
}
#def
核心是使用了比较并交换函数cmpxchg,这个在部分平台上会有对应的原子指令,如果没有的话,就会通过关中断形式来保证原子性。
代码语言:javascript复制#define generic_cmpxchg_local(ptr, o, n) ({
((__typeof__(*(ptr)))__generic_cmpxchg_local((ptr), (unsigned long)(o),
(unsigned long)(n), sizeof(*(ptr))));
})
如果ptr的值等于o,那么就会将n赋给ptr,并返回o,如果ptr的值不等于o,那么将ptr的值返回。
内存屏障
ARM架构终有3类内存屏障指令: 数据存储屏障(data memory barrier, DMB) 数据同步屏障(data synchronization barrier, DSB) 指令同步屏障(instruction synchronization barrier, ISB) linux 内核中的内存屏障函数如下:
代码语言:javascript复制barrier() 编译优化屏障,阻止编译器为了性能优化而进行指令重排
mb() 内存屏障,用于SMP和UP
rmb() 读内存屏障
wmb() 写内存屏障
smp_mb() 用于SMP场景的内存屏障
smp_rmb() 用于SMP场景的读内存屏障
smp_wmb() 用于SMP场景的写内存屏障
smp_read_barrier_depends() 读依赖屏障
自旋锁
如果操作稍复杂,这时候原子变量就搞不定了,比如要操作一个链表,这时候就需要用一个机制可以保证某些操作可以不受干扰完成,自旋锁就是其中一种。结构定义如下:
代码语言:javascript复制/* Non PREEMPT_RT kernels map spinlock to raw_spinlock */
typedef struct spinlock {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
raw_spinlock的结构是和平台相关的,arm上定义如下:
代码语言:javascript复制typedef struct {
union {
u32 slock;
struct __raw_tickets {
#ifdef __ARMEB__
u16 next;
u16 owner;
#else
u16 owner;
u16 next;
#endif
} tickets;
};
} arch_spinlock_t;
有没有发现自旋锁似乎并没有想象中简单? 自旋不就应该是用一个变量,然后while循环来反复读取,如果是0那就可以获取了,如果是1那就继续旋转。可是这样在多CPU上存在性能问题,比如多个CPU都在争这个锁,而有一个CPU获得该锁后,接下来大概率还是该CPU继续获得锁,因为锁变量在该CPU的L1 Cache中,访问速度明显高于其他CPU,这时候为了做到公平, 就引入了FIFO机制,slock记录锁标记,next记录等待的序号,owner负责当前锁的持有者,这样就可以将锁沿着next传递了。
那也有种情况,比如在某个CPU中持有自旋锁,然后突然来了一个中断,而中断处理程序也需要获取该锁,那就立马死锁了,这种情况怎么搞?linux中也有对应的方法, 就是持有自旋锁的时候关闭中断,等操作完后再开启,对应的api如下:
代码语言:javascript复制spin_lock_irqsave(spinlock_t *lock, unsigned long flags)
spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
信号量
对比自旋锁是忙等待,信号量就是睡眠等待,对应的结构如下:
代码语言:javascript复制struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
这儿的lock是用来保护其他2个成员,count表示可以同时进入临界区的路径数目,wait_list是在该信号量上睡眠的进程。对应的操作方法如下:
代码语言:javascript复制extern void down(struct semaphore *sem);
extern int __must_check down_interruptible(struct semaphore *sem);
extern int __must_check down_killable(struct semaphore *sem);
extern int __must_check down_trylock(struct semaphore *sem);
extern int __must_check down_timeout(struct semaphore *sem, long jiffies);
extern void up(struct semaphore *sem);
互斥锁
互斥锁的行为和信号量的count为1时候一样,那为什么还需要重新实现一波呢?其实还是性能,信号量考虑的竞争路径不止一个,因此就决定了考虑的因素也会复杂一些,而互斥锁就是达到互斥效果就可以了,这样就可以针对这块进行优化。 结构定义如下:
代码语言:javascript复制struct mutex {
atomic_long_t owner;
raw_spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
这里的字段基本都可以自解释,有一点是当竞争不激烈的时候,比如只有一个进程在等锁,那么就不会休眠而是切换成自旋等待,也就是忙等待,这样可以避免进入休眠。 很多年前记得遇到过一个cpu 100%问题,那时候没想明白为什么一个等锁操作会占满cpu,这儿就是答案了。 对应的api就简单了:
代码语言:javascript复制extern void mutex_lock(struct mutex *lock);
extern int __must_check mutex_lock_interruptible(struct mutex *lock);
extern int __must_check mutex_lock_killable(struct mutex *lock);
extern int mutex_trylock(struct mutex *lock);
extern void mutex_unlock(struct mutex *lock);
读写锁
针对读写进一步优化就是读写锁,也有两种读写锁,一种是自旋的读写锁,一种是信号量类型的读写锁,先看下自旋的读写锁:
代码语言:javascript复制typedef struct {
arch_rwlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} rwlock_t;
typedef struct {
u32 lock;
} arch_rwlock_t;
对应的api 如下:
代码语言:javascript复制#define read_trylock(lock) __cond_lock(lock, _raw_read_trylock(lock))
#define write_trylock(lock) __cond_lock(lock, _raw_write_trylock(lock))
#define write_lock(lock) _raw_write_lock(lock)
#define read_lock(lock) _raw_read_lock(lock)
也有关中断版本,如下:
代码语言:javascript复制#define read_lock_irq(lock) _raw_read_lock_irq(lock)
#define read_lock_bh(lock) _raw_read_lock_bh(lock)
#define write_lock_irq(lock) _raw_write_lock_irq(lock)
#define write_lock_bh(lock) _raw_write_lock_bh(lock)
#define read_unlock(lock) _raw_read_unlock(lock)
#define write_unlock(lock) _raw_write_unlock(lock)
#define read_unlock_irq(lock) _raw_read_unlock_irq(lock)
#define write_unlock_irq(lock) _raw_write_unlock_irq(lock)
信号量版本如下:
代码语言:javascript复制struct rw_semaphore {
atomic_long_t count;
/*
* Write owner or one of the read owners as well flags regarding
* the current state of the rwsem. Can be used as a speculative
* check to see if the write owner is running on the cpu.
*/
atomic_long_t owner;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* spinner MCS lock */
#endif
raw_spinlock_t wait_lock;
struct list_head wait_list;
#ifdef CONFIG_DEBUG_RWSEMS
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
这儿的count值含义比较复杂,需要表示读写状态,也需要包含等待进程的状态。对应的api如下:
代码语言:javascript复制extern void down_read(struct rw_semaphore *sem);
extern int __must_check down_read_interruptible(struct rw_semaphore *sem);
extern int __must_check down_read_killable(struct rw_semaphore *sem);
extern int down_read_trylock(struct rw_semaphore *sem);
extern void down_write(struct rw_semaphore *sem);
extern int __must_check down_write_killable(struct rw_semaphore *sem);
extern int down_write_trylock(struct rw_semaphore *sem);
extern void up_read(struct rw_semaphore *sem);
extern void up_write(struct rw_semaphore *sem);
extern void downgrade_write(struct rw_semaphore *sem);
RCU
在读写锁场景下,多个读路径可以共存,只有写出现的时候才会互斥,这样的性能还不够高,最理想的状态是读写可以共存,这就是RCU机制(read copy update)。 RCU本质上就是发布订阅模式,读的时候就是订阅变量的变化,修改变量就是发布变量的变化,在发布变量变化时读路径有可能看到新变化也可能看不到新变化,但是可以保证是看到的会是一个有效的变量,只有当所有读操作结束后,旧变量才会被释放。有这种操作后,RCU比起读写锁的性能优势就出来了What is RCU? Part 2: Usage
image.png
而将读写锁换成RCU也比较简单,如下所示:
代码语言:javascript复制1 struct el { 1 struct el {
2 struct list_head list; 2 struct list_head list;
3 long key; 3 long key;
4 spinlock_t mutex; 4 spinlock_t mutex;
5 int data; 5 int data;
6 /* Other data fields */ 6 /* Other data fields */
7 }; 7 };
8 rwlock_t listmutex; 8 spinlock_t listmutex;
9 struct el head; 9 struct el head;
1 int search(long key, int *result) 1 int search(long key, int *result)
2 { 2 {
3 struct list_head *lp; 3 struct list_head *lp;
4 struct el *p; 4 struct el *p;
5 5
6 read_lock(&listmutex); 6 rcu_read_lock();
7 list_for_each_entry(p, head, lp) { 7 list_for_each_entry_rcu(p, head, lp) {
8 if (p->key == key) { 8 if (p->key == key) {
9 *result = p->data; 9 *result = p->data;
10 read_unlock(&listmutex); 10 rcu_read_unlock();
11 return 1; 11 return 1;
12 } 12 }
13 } 13 }
14 read_unlock(&listmutex); 14 rcu_read_unlock();
15 return 0; 15 return 0;
16 } 16 }
1 int delete(long key) 1 int delete(long key)
2 { 2 {
3 struct el *p; 3 struct el *p;
4 4
5 write_lock(&listmutex); 5 spin_lock(&listmutex);
6 list_for_each_entry(p, head, lp) { 6 list_for_each_entry(p, head, lp) {
7 if (p->key == key) { 7 if (p->key == key) {
8 list_del(&p->list); 8 list_del_rcu(&p->list);
9 write_unlock(&listmutex); 9 spin_unlock(&listmutex);
10 synchronize_rcu();
10 kfree(p); 11 kfree(p);
11 return 1; 12 return 1;
12 } 13 }
13 } 14 }
14 write_unlock(&listmutex); 15 spin_unlock(&listmutex);
15 return 0; 16 return 0;
16 } 17 }
看下关键API:
image.png