在Linux系统上,多个进程可以同时运行,以及各种中断发生的中断也在同时得到处理,这种多个上下文宏观上同时运行的情况称为并发。并发具体包括如下几种可能:
1) UP平台上,一个进程正在执行时被另一个进程抢占;
2) UP平台上,一个进程正在执行时发生了中断,内核转而执行中断处理程序;
3) SMP平台上,每个处理器都会发生UP平台上的情况;
4) SMP平台上,多个进程或中断同时在多个CPU上执行;
多个并发的上下文同时使用同一个资源的情况称为竞态,而可能发生竞态的这一段代码称为临界区。内核编程时的临界区,比较多的情况是:
1) 代码访问了全局变量,并且这段代码可被多个进程执行;
2) 代码访问了全局变量,并且这段代码可被进程执行,也可被中断处理程序执行;
针对上述情况,内核提供了如下手段来解决竟态问题:
1)锁机制:
2)院子操作:
下面会先介绍锁机制。Linux内核提供了多种锁机制,这些锁机制的区别在于,当获取不到锁时,执行程序是否发生睡眠并进行系统调度。具体包括自旋锁、互斥体、信号量。
一、自旋锁:spinlock_t
自旋锁有两个基本操作:获取与释放。获取自旋锁时,当判断锁的状态为未锁,则会马上加锁,如果已经是锁定的状态,当期执行流程则会执行“忙等待”,中间没有任何的调度操作。也就说执行流程从判断锁的状态到完成加锁,是一个原子操作,在执行上是不可分割的。
自旋锁的实现是平台相关的,在使用的时候,只需统一包含如下头文件:
#include
自旋锁的变量类型是spinlock_t,定义如下:
typedef struct { raw_spinlock_t raw_lock; #if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP) unsigned int break_lock; #endif} spinlock_t; typedef struct { volatile unsigned int lock;} raw_spinlock_t;
1)自旋锁需要初始化才能使用,自旋锁的初始化接口如下:
# define spin_lock_init(lock) \do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
2)获取自旋锁的接口如下:
void spin_lock(spinlock_t *lock);
3)释放自旋锁的接口如下:
void spin_unlock(spinlock_t *lock);
获取自旋锁的时候,内部会首先禁止抢占,然后来时循环判断锁的状态。在UP版本中,唯一的操作就是禁止抢占,如果是UP版本且非抢占式内核,则进一步退化为无操作。可以粗略的看一下内核源码的实现,因为自旋锁的实现是与平台相关的,这里以arm平台为例:
void __lockfunc _spin_lock(spinlock_t *lock){ preempt_disable(); //关闭抢占 spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); //这个如果没有打开调试自旋锁的宏的话,相当于无操作。 _raw_spin_lock(lock); //这里调用平台相关的代码来轮询锁的状态}
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
unsigned long tmp;
__asm__ __volatile__(
"1: ldrex %0, [%1]\n" //取lock->lock放在 tmp里,并且设置&lock->lock这个内存地址为独占访问
" teq %0, #0\n" // 测试lock->lock是否为0,影响标志位z
#ifdef CONFIG_CPU_32v6K
" wfene\n"
#endif
" strexeq %0, %2, [%1]\n" //如果lock->lock是0,并且是独占访问这个内存,就向lock->lock里 写入1,并向tmp返回0,同时清除独占标记
" teqeq %0, #0\n" //如 果lock->lock是0,并且strexeq返回了0,表示加锁成功,返回
" bne 1b" //如果加锁失败,则会向后跳转到上面的标号1处,再次重新执行
: "=&r" (tmp)
: "r" (&lock->lock), "r" (1)
: "cc");
smp_mb();
}
上面那段汇编代码就是循环判断lock的值,最后的那个bne 1b,就可以明白为什么自旋锁在获取不到锁的情况下,会进行所谓的“自旋”了!
4) 此外,内核还提供了一个用于尝试获取自旋锁的接口:
Int spin_trylock(spinlock_t *lock);
到这里为止,上面介绍的接口都没有考虑到在获取锁以后又发生中断的问题,如果要解决与中断的互斥问题,则需要使用以下接口:
Void spin_lock_ireq(spinlock_t *lock); // 禁止中断并获取自旋锁Void spin_unlock_irq(spinlock_t *lock); // 释放自旋锁并使能中断Void spin_lock_irq_save(spin_lock_t *lock, unsigned long flags); // 禁止中断并获取保存中断状态,然后获取自旋锁Void spin_unlok_irq_store(spinlock_t *lock, unsigned long flags); // 释放自旋锁,并将中断状态恢复为已保存的状态值。Int spin_trylock_irq(spinlock_t *lock); // 禁止中断并尝试获取自旋锁,成功返回非0值,返回值为0表示获取失败,则中断状态恢复为使能状态。Int spin_trylock_irqsave(spinlock_t *lock, unsigned long flags); // 禁止中断并保存中断状态,然后尝试获取自旋锁,返回值为非0表示获取成功,0表示获取失败,则会恢复中断状态。
这里简单的看一下内核是如何实现上述接口的:在kernel/spinlock.c文件中
void __lockfunc _spin_lock_irq(spinlock_t *lock){ local_irq_disable(); // 关闭当前CPU上的中断 // 下面的执行流程和spin_lock接口一样的 preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); _raw_spin_lock(lock);}EXPORT_SYMBOL(_spin_lock_irq);# define spin_unlock_irq(lock) _spin_unlock_irq(lock)void __lockfunc _spin_unlock_irq(spinlock_t *lock){ spin_release(&lock->dep_map, 1, _RET_IP_); _raw_spin_unlock(lock); local_irq_enable(); preempt_enable();}EXPORT_SYMBOL(_spin_unlock_irq);
自旋锁的使用原则:
1) 能不用尽量不用,并且持有锁的时间应尽可能的短。因为持有锁以后,其他CPU要获取同一把锁的执行流程会陷入空循环,消耗CPU资源;
2) 持有锁以后尽量不要再去获取另一把锁,如果需要则代码各处获取锁的顺序要一致,否则容易引起死锁
3) 从性能的角度考虑,如果不需要与中断互斥,则不要使用禁止中断的接口;
4) 在获取自旋锁以后,到释放自旋锁之前,不允许调用或者是间接调用引起系统调度的操作。因为一旦获取锁以后,就进入一种特殊状态--原子上下文,即在此状态下不允许被打断,而系统调度会打断当前的执行流程。
二、互斥体:
互斥体与信号量都是属于非原子操作的同步手段,共同的特点是:当获取失败需要将当前进程挂起,进入等待状态,进程也将进入睡眠状态。
Linux内核互斥体的定义和声明是在linux/mutex.h头文件中,主要包括如下接口:
// 初始化:void mutex_init(struct mutex *lock);// 定义并初始化互斥体变量lockDEFINE_MUTEX(lock); // 获取mutexvoid mutex_lock(struct mutex *lock);int mutex_lock_interruptible(struct mutex *lock);int mutex_lock_killable(strut mutex *lock);int mutex_trylock(struct mutex *lock); // 释放mutexvoid mutex_unlock(struct mutex *lock); // 销毁mutexvoid mutex_destroy(struct mutex *lock)
看到这些接口,会发现与我们编写应用程序使用的mutex接口很类似,用法也很类似。
mutex_lock()如果不能够获取mutex,则当前进程会进入睡眠,直至有其他的进程释放这个mutex,才会被唤醒。函数返回时说明互斥体已经获取成功。如果希望进程在等待互斥体时任然可以响应信号,则应使用mutex_lock_interruptible()。
在使用的过程中要注意如下几点:
1) mutex的获取有可能导致进程睡眠,所以不能够用于中断上下文中,只可以用在进程上下文中;
2) mutex的获取与释放必须是同一个进程,不能够在A进程获取mutex,然后在B进程释放mutex;
下面简单的对其实现源码逻辑大致的走一下:
首先看看mutex的定义:
struct mutex { /* 1: unlocked, 0: locked, negative: locked, possible waiters */ atomic_t count; spinlock_t wait_lock; struct list_head wait_list;#ifdef CONFIG_DEBUG_MUTEXES struct thread_info *owner; const char *name; void *magic;#endif#ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map;#endif};
如果我们不进行调试,那么就只有count、wait_lock和wait_list成员需要关注的。
可以发现一个mutex就是一个原子计数器count(保存互斥体的状态),以及一个用于存放获取Mutex失败的进程链表wait_list,wali_lock是为了保证原子操作wait_list。所以说互斥体是在自旋锁的基础上实现的!
然后看看mutex的初始化,主要是初始化mutex对象的成员,将原子计数器初始化为1,表示处于unlocked状态。
# define mutex_init(mutex) \do { \static struct lock_class_key __key; \\__mutex_init((mutex), #mutex, &__key); \} while (0) void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key){atomic_set(&lock->count, 1); // 原子计数器初始化为1,表示处于unlocked状态spin_lock_init(&lock->wait_lock);INIT_LIST_HEAD(&lock->wait_list); debug_mutex_init(lock, name, key);}
接着再看看互斥体的获取mutex_lock()的实现,是如何让当前进程在获取mutex失败的情况下进入睡眠等待的:
void inline fastcall __sched mutex_lock(struct mutex *lock){ might_sleep();/* * The locking fastpath is the 1->0 transition from * 'unlocked' into 'locked' state. */ __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);}// __mutex_fastpath_lock()的作用是对原子计数器进行减1,并判断原子计数器是否为0,如果为0则直接返回,表示获取锁成功,否则表示获取失败,会调用__mutex_lock_slowpath()进行后续的阻塞睡眠处理。static void fastcall noinline __sched__mutex_lock_slowpath(atomic_t *lock_count){ struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0);}// 最后还是通过__mutex_lock_common()函数来完成阻塞睡眠处理:static inline int __sched__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass){ struct task_struct *task = current; struct mutex_waiter waiter; unsigned int old_val; unsigned long flags; spin_lock_mutex(&lock->wait_lock, flags); debug_mutex_lock_common(lock, &waiter); mutex_acquire(&lock->dep_map, subclass, 0, _RET_IP_); debug_mutex_add_waiter(lock, &waiter, task->thread_info); /* add waiting tasks to the end of the waitqueue (FIFO): */ list_add_tail(&waiter.list, &lock->wait_list); waiter.task = task; for (;;) { /* * Lets try to take the lock again - this is needed even if * we get here for the first time (shortly after failing to * acquire the lock), to make sure that we get a wakeup once * it's unlocked. Later on, if we sleep, this is the * operation that gives us the lock. We xchg it to -1, so * that when we release the lock, we properly wake up the * other waiters: */ old_val = atomic_xchg(&lock->count, -1); if (old_val == 1) break; /* * got a signal? (This code gets eliminated in the * TASK_UNINTERRUPTIBLE case.) */ if (unlikely(state == TASK_INTERRUPTIBLE && signal_pending(task))) { mutex_remove_waiter(lock, &waiter, task->thread_info); mutex_release(&lock->dep_map, 1, _RET_IP_); spin_unlock_mutex(&lock->wait_lock, flags); debug_mutex_free_waiter(&waiter); return -EINTR; } __set_task_state(task, state); /* didnt get the lock, go to sleep: */ spin_unlock_mutex(&lock->wait_lock, flags); schedule(); spin_lock_mutex(&lock->wait_lock, flags); } /* got the lock - rejoice! */ mutex_remove_waiter(lock, &waiter, task->thread_info); debug_mutex_set_owner(lock, task->thread_info); /* set it to 0 if there are no waiters left: */ if (likely(list_empty(&lock->wait_list))) atomic_set(&lock->count, 0); spin_unlock_mutex(&lock->wait_lock, flags); debug_mutex_free_waiter(&waiter); return 0;}
__mutex_lock_common()函数代码比较长,大致的代码逻辑是:
1)先将当前进程current放入到mutex的wait_list链表中;
2)然后是执行schedule(),执行进程切换调度,CPU就会从run queue中选取一个优先级最高的任务来运行;
这个时候,获取mutex的进程就已经挂起了suspend,需要有其他的进程调用mutex_unlock(),才能将此进程唤醒,并重新加入到run queue中继续运行。
上面的代码会把schedule()放在一个for循环内部,这是因为进程被唤醒后,需要先检查条件是否满足,如果不满足,则会再次挂起。
下面就分析下时如何唤醒等待在此mutex上的进程的: mutex_unlock()
void fastcall __sched mutex_unlock(struct mutex *lock){/* * The unlocking fastpath is the 0->1 transition from 'locked' * into 'unlocked' state: */ __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);}// __mutex_fastpath_unlock会对原子计数器count进行加1,然后调用__mutex_unlock_slowpath()来唤醒等待此mutex的进程。static fastcall inline void__mutex_unlock_common_slowpath(atomic_t *lock_count, int nested){ struct mutex *lock = container_of(lock_count, struct mutex, count); unsigned long flags; spin_lock_mutex(&lock->wait_lock, flags); mutex_release(&lock->dep_map, nested, _RET_IP_); debug_mutex_unlock(lock); /* * some architectures leave the lock unlocked in the fastpath failure * case, others need to leave it locked. In the later case we have to * unlock it here */ if (__mutex_slowpath_needs_to_unlock()) atomic_set(&lock->count, 1); if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list); debug_mutex_wake_waiter(lock, waiter); wake_up_process(waiter->task); } debug_mutex_clear_owner(lock); spin_unlock_mutex(&lock->wait_lock, flags);}
从上面的代码可以看出,释放mutex的过程就是把等待链表的第一个进程取出来,然后将其放入run-queue运行队列中,这样就能够被调度器调度,得到运行。其中wake_up_process()函数的作用就是把一个指定的进程放入到run queue中,并将进程的状态修改为TASK_RUNNING.
代码分析到这里,应该基本上就清楚了mutex的实现逻辑了。使用mutex就是有一点需要特别注意的,必须同一个进程对其lock和unlock操作,仔细分析下代码,就可以发现了,这里就不再说明了。
三、信号量:
信号量与互斥体都是作为同步的手段,共同点是当获取失败时会导致进程睡眠,所以都是不可以用在中断上下文中,只可以用在进程上下文中。
不同点是:
1) mutex的获取与释放必须是由同一个进程执行,而semaphore则可以跨进程使用,即可以在A进程获取信号量,在B进程释放信号量。
2) mutex只有locked和unloked两种状态值,semaphore的计数器可以大于1.
简单的看一下信号量的使用接口:
#include// 初始化信号量对象struct semaphore sem;void sema_init(struct semapthore *sem, int val); // 获取信号量void down(struct semaphore *sem); //如果获取失败,进程将进入不可被信号打断的睡眠状态int down_interruptible(struct semaphore *sem); //如果获取失败,进程将进入可被信号打断的睡眠状态int down_killable(strut semaphore *sem); // 如果获取失败,进程在睡眠等待的过程中,只响应致命信号int down_timeout(struct semaphore *sem, long jiffies); // 进程对信号量的等待操作有时间限制 // 释放信号量void up(struct semaphpre *sem);
down_interruptible()和down_killable()的返回值说明,0表示获取成功,-EINTR表示在等待信号量的过程中被信号打断,信号量获取失败。
down_timeout()的返回值说明,0表示成功,-ETIME表示在等待信号量的过程中超时,获取信号量失败。