博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
内核同步-锁机制
阅读量:6716 次
发布时间:2019-06-25

本文共 10475 字,大约阅读时间需要 34 分钟。

hot3.png

    Linux系统上,多个进程可以同时运行,以及各种中断发生的中断也在同时得到处理,这种多个上下文宏观上同时运行的情况称为并发。并发具体包括如下几种可能:

    1) UP平台上,一个进程正在执行时被另一个进程抢占;

    2) UP平台上,一个进程正在执行时发生了中断,内核转而执行中断处理程序;

    3) SMP平台上,每个处理器都会发生UP平台上的情况;

    4) SMP平台上,多个进程或中断同时在多个CPU上执行;

多个并发的上下文同时使用同一个资源的情况称为竞态,而可能发生竞态的这一段代码称为临界区。内核编程时的临界区,比较多的情况是:

1) 代码访问了全局变量,并且这段代码可被多个进程执行;

2) 代码访问了全局变量,并且这段代码可被进程执行,也可被中断处理程序执行;

针对上述情况,内核提供了如下手段来解决竟态问题:

1)锁机制:

2)院子操作:

下面会先介绍锁机制。Linux内核提供了多种锁机制,这些锁机制的区别在于,当获取不到锁时,执行程序是否发生睡眠并进行系统调度。具体包括自旋锁、互斥体、信号量。

一、自旋锁:spinlock_t

自旋锁有两个基本操作:获取与释放。获取自旋锁时,当判断锁的状态为未锁,则会马上加锁,如果已经是锁定的状态,当期执行流程则会执行“忙等待”,中间没有任何的调度操作。也就说执行流程从判断锁的状态到完成加锁,是一个原子操作,在执行上是不可分割的。

自旋锁的实现是平台相关的,在使用的时候,只需统一包含如下头文件:

#include 

自旋锁的变量类型是spinlock_t,定义如下:

typedef struct {    raw_spinlock_t raw_lock;    #if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)    unsigned int break_lock;    #endif} spinlock_t; typedef struct {    volatile unsigned int lock;} raw_spinlock_t;

1)自旋锁需要初始化才能使用,自旋锁的初始化接口如下:

# define spin_lock_init(lock)     \do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)

2)获取自旋锁的接口如下:

void spin_lock(spinlock_t *lock);

3)释放自旋锁的接口如下:

void spin_unlock(spinlock_t *lock);

    获取自旋锁的时候,内部会首先禁止抢占,然后来时循环判断锁的状态。在UP版本中,唯一的操作就是禁止抢占,如果是UP版本且非抢占式内核,则进一步退化为无操作。可以粗略的看一下内核源码的实现,因为自旋锁的实现是与平台相关的,这里以arm平台为例:

void __lockfunc _spin_lock(spinlock_t *lock){    preempt_disable(); //关闭抢占    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); //这个如果没有打开调试自旋锁的宏的话,相当于无操作。    _raw_spin_lock(lock); //这里调用平台相关的代码来轮询锁的状态}

static inline void __raw_spin_lock(raw_spinlock_t *lock)

    {

        unsigned long tmp;

        __asm__ __volatile__(

        "1: ldrex   %0, [%1]\n"  //取lock->lock放在 tmp里,并且设置&lock->lock这个内存地址为独占访问

        "   teq %0, #0\n"   // 测试lock->lock是否为0,影响标志位z

        #ifdef CONFIG_CPU_32v6K

            "   wfene\n"

        #endif

        "   strexeq %0, %2, [%1]\n"  //如果lock->lock是0,并且是独占访问这个内存,就向lock->lock里 写入1,并向tmp返回0,同时清除独占标记

        "   teqeq   %0, #0\n" //如 果lock->lock是0,并且strexeq返回了0,表示加锁成功,返回

        "   bne 1b"   //如果加锁失败,则会向后跳转到上面的标号1处,再次重新执行

         : "=&r" (tmp)

        : "r" (&lock->lock), "r" (1) 

        : "cc");

        smp_mb();

    }

上面那段汇编代码就是循环判断lock的值,最后的那个bne 1b,就可以明白为什么自旋锁在获取不到锁的情况下,会进行所谓的“自旋”了!

4) 此外,内核还提供了一个用于尝试获取自旋锁的接口:

Int spin_trylock(spinlock_t *lock);

到这里为止,上面介绍的接口都没有考虑到在获取锁以后又发生中断的问题,如果要解决与中断的互斥问题,则需要使用以下接口:

Void spin_lock_ireq(spinlock_t *lock); // 禁止中断并获取自旋锁Void spin_unlock_irq(spinlock_t *lock); // 释放自旋锁并使能中断Void spin_lock_irq_save(spin_lock_t *lock, unsigned long flags); // 禁止中断并获取保存中断状态,然后获取自旋锁Void spin_unlok_irq_store(spinlock_t *lock, unsigned long flags); // 释放自旋锁,并将中断状态恢复为已保存的状态值。Int spin_trylock_irq(spinlock_t *lock); // 禁止中断并尝试获取自旋锁,成功返回非0值,返回值为0表示获取失败,则中断状态恢复为使能状态。Int spin_trylock_irqsave(spinlock_t *lock, unsigned long flags); // 禁止中断并保存中断状态,然后尝试获取自旋锁,返回值为非0表示获取成功,0表示获取失败,则会恢复中断状态。

    这里简单的看一下内核是如何实现上述接口的:在kernel/spinlock.c文件中

void __lockfunc _spin_lock_irq(spinlock_t *lock){    local_irq_disable(); // 关闭当前CPU上的中断    // 下面的执行流程和spin_lock接口一样的    preempt_disable();     spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);    _raw_spin_lock(lock);}EXPORT_SYMBOL(_spin_lock_irq);# define spin_unlock_irq(lock)  _spin_unlock_irq(lock)void __lockfunc _spin_unlock_irq(spinlock_t *lock){    spin_release(&lock->dep_map, 1, _RET_IP_);    _raw_spin_unlock(lock);    local_irq_enable();    preempt_enable();}EXPORT_SYMBOL(_spin_unlock_irq);

    自旋锁的使用原则:

    1) 能不用尽量不用,并且持有锁的时间应尽可能的短。因为持有锁以后,其他CPU要获取同一把锁的执行流程会陷入空循环,消耗CPU资源;

    2) 持有锁以后尽量不要再去获取另一把锁,如果需要则代码各处获取锁的顺序要一致,否则容易引起死锁        

    3) 从性能的角度考虑,如果不需要与中断互斥,则不要使用禁止中断的接口;

    4) 在获取自旋锁以后,到释放自旋锁之前,不允许调用或者是间接调用引起系统调度的操作。因为一旦获取锁以后,就进入一种特殊状态--原子上下文,即在此状态下不允许被打断,而系统调度会打断当前的执行流程。

    二、互斥体:

互斥体与信号量都是属于非原子操作的同步手段,共同的特点是:当获取失败需要将当前进程挂起,进入等待状态,进程将进入睡眠状态。

    Linux内核互斥体的定义和声明是在linux/mutex.h头文件中,主要包括如下接口:

// 初始化:void mutex_init(struct mutex *lock);// 定义并初始化互斥体变量lockDEFINE_MUTEX(lock); // 获取mutexvoid mutex_lock(struct mutex *lock);int mutex_lock_interruptible(struct mutex *lock);int mutex_lock_killable(strut mutex *lock);int mutex_trylock(struct mutex *lock); // 释放mutexvoid mutex_unlock(struct mutex *lock); // 销毁mutexvoid mutex_destroy(struct mutex *lock)

看到这些接口,会发现与我们编写应用程序使用的mutex接口很类似,用法也很类似。

        mutex_lock()如果不能够获取mutex,则当前进程会进入睡眠,直至有其他的进程释放这个mutex,才会被唤醒。函数返回时说明互斥体已经获取成功。如果希望进程在等待互斥体时任然可以响应信号,则应使用mutex_lock_interruptible()

    在使用的过程中要注意如下几点:

    1) mutex的获取有可能导致进程睡眠,所以不能够用于中断上下文中,只可以用在进程上下文中;

    2) mutex的获取与释放必须是同一个进程,不能够在A进程获取mutex,然后在B进程释放mutex

    下面简单的对其实现源码逻辑大致的走一下:

    首先看看mutex的定义:

struct mutex {    /* 1: unlocked, 0: locked, negative: locked, possible waiters */     atomic_t  count;     spinlock_t  wait_lock;     struct list_head wait_list;#ifdef CONFIG_DEBUG_MUTEXES    struct thread_info *owner;    const char   *name;    void   *magic;#endif#ifdef CONFIG_DEBUG_LOCK_ALLOC    struct lockdep_map dep_map;#endif};

        如果我们不进行调试,那么就只有count、wait_lock和wait_list成员需要关注的。

    可以发现一个mutex就是一个原子计数器count(保存互斥体的状态),以及一个用于存放获取Mutex失败的进程链表wait_listwali_lock是为了保证原子操作wait_list。所以说互斥体是在自旋锁的基础上实现的!

    然后看看mutex的初始化,主要是初始化mutex对象的成员,将原子计数器初始化为1,表示处于unlocked状态。

# define mutex_init(mutex) \do {       \static struct lock_class_key __key;  \\__mutex_init((mutex), #mutex, &__key);  \} while (0) void  __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key){atomic_set(&lock->count, 1); // 原子计数器初始化为1,表示处于unlocked状态spin_lock_init(&lock->wait_lock);INIT_LIST_HEAD(&lock->wait_list); debug_mutex_init(lock, name, key);}

        接着再看看互斥体的获取mutex_lock()的实现,是如何让当前进程在获取mutex失败的情况下进入睡眠等待的:

void inline fastcall __sched mutex_lock(struct mutex *lock){    might_sleep();/* * The locking fastpath is the 1->0 transition from * 'unlocked' into 'locked' state. */    __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);}// __mutex_fastpath_lock()的作用是对原子计数器进行减1,并判断原子计数器是否为0,如果为0则直接返回,表示获取锁成功,否则表示获取失败,会调用__mutex_lock_slowpath()进行后续的阻塞睡眠处理。static void fastcall noinline __sched__mutex_lock_slowpath(atomic_t *lock_count){    struct mutex *lock = container_of(lock_count, struct mutex, count);     __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0);}// 最后还是通过__mutex_lock_common()函数来完成阻塞睡眠处理:static inline int __sched__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass){    struct task_struct *task = current;    struct mutex_waiter waiter;    unsigned int old_val;    unsigned long flags;     spin_lock_mutex(&lock->wait_lock, flags);     debug_mutex_lock_common(lock, &waiter);    mutex_acquire(&lock->dep_map, subclass, 0, _RET_IP_);    debug_mutex_add_waiter(lock, &waiter, task->thread_info);     /* add waiting tasks to the end of the waitqueue (FIFO): */     list_add_tail(&waiter.list, &lock->wait_list);     waiter.task = task;     for (;;) {    /*     * Lets try to take the lock again - this is needed even if     * we get here for the first time (shortly after failing to     * acquire the lock), to make sure that we get a wakeup once     * it's unlocked. Later on, if we sleep, this is the     * operation that gives us the lock. We xchg it to -1, so     * that when we release the lock, we properly wake up the     * other waiters:     */    old_val = atomic_xchg(&lock->count, -1);      if (old_val == 1)       break;     /*     * got a signal? (This code gets eliminated in the     * TASK_UNINTERRUPTIBLE case.)     */    if (unlikely(state == TASK_INTERRUPTIBLE &&        signal_pending(task))) {        mutex_remove_waiter(lock, &waiter, task->thread_info);        mutex_release(&lock->dep_map, 1, _RET_IP_);        spin_unlock_mutex(&lock->wait_lock, flags);         debug_mutex_free_waiter(&waiter);        return -EINTR;    }    __set_task_state(task, state);     /* didnt get the lock, go to sleep: */    spin_unlock_mutex(&lock->wait_lock, flags);    schedule();    spin_lock_mutex(&lock->wait_lock, flags);  }     /* got the lock - rejoice! */     mutex_remove_waiter(lock, &waiter, task->thread_info);    debug_mutex_set_owner(lock, task->thread_info);     /* set it to 0 if there are no waiters left: */     if (likely(list_empty(&lock->wait_list)))      atomic_set(&lock->count, 0);     spin_unlock_mutex(&lock->wait_lock, flags);    debug_mutex_free_waiter(&waiter);    return 0;}

    __mutex_lock_common()函数代码比较长,大致的代码逻辑是:

    1)先将当前进程current放入到mutex的wait_list链表中;

    2)然后是执行schedule(),执行进程切换调度,CPU就会从run queue中选取一个优先级最高的任务来运行;

    这个时候,获取mutex的进程就已经挂起了suspend,需要有其他的进程调用mutex_unlock(),才能将此进程唤醒,并重新加入到run queue中继续运行。

    上面的代码会把schedule()放在一个for循环内部,这是因为进程被唤醒后,需要先检查条件是否满足,如果不满足,则会再次挂起。

    下面就分析下时如何唤醒等待在此mutex上的进程的: mutex_unlock()

void fastcall __sched mutex_unlock(struct mutex *lock){/* * The unlocking fastpath is the 0->1 transition from 'locked' * into 'unlocked' state: */    __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);}// __mutex_fastpath_unlock会对原子计数器count进行加1,然后调用__mutex_unlock_slowpath()来唤醒等待此mutex的进程。static fastcall inline void__mutex_unlock_common_slowpath(atomic_t *lock_count, int nested){    struct mutex *lock = container_of(lock_count, struct mutex, count);    unsigned long flags;     spin_lock_mutex(&lock->wait_lock, flags);    mutex_release(&lock->dep_map, nested, _RET_IP_);    debug_mutex_unlock(lock); /* * some architectures leave the lock unlocked in the fastpath failure * case, others need to leave it locked. In the later case we have to * unlock it here */    if (__mutex_slowpath_needs_to_unlock())        atomic_set(&lock->count, 1);      if (!list_empty(&lock->wait_list)) {        /* get the first entry from the wait-list: */        struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list);         debug_mutex_wake_waiter(lock, waiter);        wake_up_process(waiter->task);     }     debug_mutex_clear_owner(lock);    spin_unlock_mutex(&lock->wait_lock, flags);}

        从上面的代码可以看出,释放mutex的过程就是把等待链表的第一个进程取出来,然后将其放入run-queue运行队列中,这样就能够被调度器调度,得到运行。其中wake_up_process()函数的作用就是把一个指定的进程放入到run queue中,并将进程的状态修改为TASK_RUNNING.

    代码分析到这里,应该基本上就清楚了mutex的实现逻辑了。使用mutex就是有一点需要特别注意的,必须同一个进程对其lock和unlock操作,仔细分析下代码,就可以发现了,这里就不再说明了。

三、信号量:

信号量与互斥体都是作为同步的手段,共同点是当获取失败时会导致进程睡眠,所以都是不可以用在中断上下文中,只可以用在进程上下文中。

不同点是:

1) mutex的获取与释放必须是由同一个进程执行,而semaphore则可以跨进程使用,即可以在A进程获取信号量,在B进程释放信号量。

2) mutex只有lockedunloked两种状态值,semaphore的计数器可以大于1.

简单的看一下信号量的使用接口:

#include 
 // 初始化信号量对象struct semaphore sem;void sema_init(struct semapthore *sem, int val); // 获取信号量void down(struct semaphore *sem); //如果获取失败,进程将进入不可被信号打断的睡眠状态int down_interruptible(struct semaphore *sem); //如果获取失败,进程将进入可被信号打断的睡眠状态int down_killable(strut semaphore *sem); // 如果获取失败,进程在睡眠等待的过程中,只响应致命信号int down_timeout(struct semaphore *sem,  long jiffies); // 进程对信号量的等待操作有时间限制 // 释放信号量void up(struct semaphpre *sem);

down_interruptible()down_killable()的返回值说明,0表示获取成功,-EINTR表示在等待信号量的过程中被信号打断,信号量获取失败。

down_timeout()的返回值说明,0表示成功,-ETIME表示在等待信号量的过程中超时,获取信号量失败。

转载于:https://my.oschina.net/kaedehao/blog/631349

你可能感兴趣的文章
安装NTFS For Mac时显示文件已损坏怎么办
查看>>
-webkit-line-clamp实现多行文字溢出隐藏显示省略号
查看>>
配置sunspot tomcat结合sunspot_rails
查看>>
飞信系统4月29日升级后飞信机器人无法使用的解决办法
查看>>
Canonical今天宣布推出Plex Media Server作为Snap Store中的Snap应用程序
查看>>
Font Awesome
查看>>
Dubbo消费者
查看>>
虚拟化中虚拟机处理器核数与物理主机cpu的关系
查看>>
org.codehaus.jackson.map.JsonMappingException: No suitable constructor found for type
查看>>
MYSQL: mysqlbinlog读取二进制文件报错read_log_event()
查看>>
随机产生由特殊字符,大小写字母以及数字组成的字符串,且每种字符都至少出现一次...
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
java21:捕鱼达人
查看>>
Zabbix 服务端搭建
查看>>
Java - 一个单例
查看>>
学习JAVA 持续更新
查看>>
Spring propertyConfigurer类
查看>>
Linux系统分析工具之uptime,top(一)
查看>>
EIGRP之DUAL(扩散更新算法)
查看>>