现在的位置: 首页 > 综合 > 正文

【Linux 驱动】第五章 并发和竞态

2013年10月10日 ⁄ 综合 ⁄ 共 10401字 ⁄ 字号 评论关闭

一,概念

       并发(concurrency)指的是多个执行单元同时被执行

       竞态(race condition)并发的执行单元对共享资源(硬件资源和软件上的全局变量、静态变量等)的访问导致竞态

        例子:两个进程试图向同一个设备的相同位置写入数据,造成数据混乱

        解决:加锁或互斥 从而确保 同一时间 只有一个进程执行操作

二,规则

        在设计自己的驱动程序时,第一个要记住的规则是,只要可能,就应该避免资源的共享。如果没有并发的访问,也就不会有竞态的产生。因此,仔细编写的内核代码应具有最少的共享。这种思想的最明显应用就是避免使用全局变量。

但是资源的共享是不可避免的,如硬件资源的本质就是共享、指针传递等。

三,资源共享的硬规则:

      (1)在单个执行线程之外共享硬件或软件资源的任何时候,因为另外一个线程可能产生对该资源的不一致观察,因此必须显示地管理对该资源的访问。

      (2)当内核代码创建了一个可能和其他内核部分共享的对象时,该对象必须在其他组件引用自己时保持存在(并正确工作)

四,信号量和互斥体

        一个信号量(semaphore: 旗语,信号灯)本质上是一个整数值,它和一对函数联合使用,这一对函数通常称为P和V。希望进入临届区的进程将在相关信号量上调用P;如果信号量的值大于零,则该值会减小一,而进程可以继续。相反,如果信号量的值为零(或更小),进程必须等待知道其他人释放该信号。对信号量的解锁通过调用V完成;该函数增加信号量的值,并在必要时唤醒等待的进程。
        当信号量用于互斥时(即避免多个进程同是在一个临界区运行),信号量的值应初始化为1。这种信号量在任何给定时刻只能由单个进程或线程拥有。在这种使用模式下,一个信号量有事也称为一个“互斥体(mutex)”,它是互斥(mutual exclusion)的简称。Linux内核中几乎所有的信号量均用于互斥。
五,使用信号量
        1)信号量的实现也是与体系结构相关的,定义在<asm/semaphore.h>中,struct semaphore类型用来表示信号量

        2)信号量的使用

              1>定义信号量:

struct   semaphore   sem;

              2>信号量初始化:void sema_init(struct  semaphore *sem, int  val);//val是初始值

              3>由于信号量通常被用于互斥模式。所以以下是内核提供的一组辅助函数和宏: 
                /*方法一、声明+初始化宏*/
                DECLARE_MUTEX(name);    //name=1
                DECLARE_MUTEX_LOCKED(name); //name=0
                /*方法二、初始化函数*/
               void init_MUTEX(struct semaphore *sem);
               void init_MUTEX_LOCKED(struct semaphore *sem);
               

              【说明】带有“_LOCKED”的是将信号量初始化为0,即锁定,允许任何线程访问时必须先解锁。没带的为1
                4>P函数(当线程执行下面函数时候,该线程就拥有该资源)
                    void  down(struct semaphore *sem); //不推荐使用,会建立不可杀进程
                    int     down_interruptible(struct semaphore *sem);//推荐使用,使用(可以中断)        
                    int     down_trylock(struct semaphore *sem);/*带有“_trylock”的永不休眠,若信号量在调用是不可获得,会返回非零值。*/
                

                【说明】down_interruptible需要格外小心,若操作被中断,该函数会返回非零值,而调用这不会拥有该信号量。对down_interruptible的正确使用需要始终检查返回值,并做出相应的响应

                5>V函数为: 
                     void up(struct semaphore *sem);

                     任何拿到信号量的线程都必须通过一次(只有一次)对up的调用而释放该信号量。在出错时,要特别小心;若在拥有一个信号量时发生错误,必须在将错误状态返回前释放信号量。

五,在scull中使用信号量 (scull  区域装载的简单字符工具)

        1)要使用的scull_dev结构

       struct scull_dev {
              struct scull_qset *data;  /* 指向第一个scull_qset结构体 */
              int quantum;    /* 量子大小,量子也是指针,指向的内存区域大小即为quantum */
              int qset;  /* 量子集大小(指针数组元素个数),量子集即指针数组,其元素即量子 */ 
              unsigned long size;   /* 数据总量,动态量,使用时由写入数据总量决定 */
              unsigned int access_key;  /* used by sculluid and scullpriv */
              struct semaphore sem;     /* 互斥信号量 */
              struct cdev cdev;    /* 字符设备结构 */
         };
        

        2)信号量在使用前必须初始化

        for (i = 0; i scull_nr_devs; i++)

       {
           scull_devices.quantum = scull_quantum;
           scull_devices.qset = scull_qset;
           init_MUTEX(&scull_devices.sem);/* 注意顺序:先初始化好互斥信号量 ,再使scull_devices可用。*/
           scull_setup_cdev(&scull_devices, i);
       }

       而且要确保在不拥有信号量的时候不会访问scull_dev结构体。

读取者/写入者信号量

       信号量对所有的调用者互斥,而不管每个线程到底想做什么。

       允许多个并发的读取者是可能的,Linux内核为这种情形提供了一种特殊的信号量类型,称为“rwsem”(或者reader/write semaphore,读取者/写入者信号量)。使用rwsem的代码必须包含<linux/rwsem.h>,rwsem相关的数据类型是struct rw_semaphore。


       初始化: void init_rwsem(struct rw_semaphore *sem);

       只读接口: 
                   void   down_read(struct rw_semaphore *sem);
                   int      down_read_trylock(struct rw_semaphore *sem);//成功时候返回0
                   void   up_read(struct rw_semaphore *sem);
      写入接口: 
                  void    down_write(struct rw_semaphore *sem);
                  int       down_write_trylock(struct rw_semaphore *sem);
                  void    up_write(struct rw_semaphore *sem);
                  void   downgrade_write(struct rw_semaphore *sem);/*该函数用于把写者降级为读者,这有时是必要的。因为写者是排他性的,因此在写者保持读写信号量期间,任何读者或写者都将无法访问该读写信号量保护的共享资源,对于那些当前条件下不需要写访问的写者,降级为读者将,使得等待访问的读者能够立刻访问,从而增加了并发性,提高了效率。*/

              一个 rwsem 允许一个写者或无限多个读者来拥有该信号量. 写者有优先权; 当某个写者试图进入临界区, 就不会允许读者进入直到写者完成了它的工作. 如果有大量的写者竞争该信号量,则这个实现可能导致读者“饿死”,即可能会长期拒绝读者访问。因此, rwsem 最好用在很少请求写的时候, 并且写者只占用短时间.
七,completion
        completion是一种轻量级的机制,它允许一个线程告诉另一个线程某个工作已经完成。代码必须包含。使用的代码如下: 
     
       

Linux内核提供了一种更好的同步机制,即完成量(completion),完成量允许一个线程告诉另一个线程某个工作已经完成,其声明在<linux/completion.h>中。

1、创建和初始化completion

DECLARE_COMPLETION(my_completion);  // 定义+初始化

动态创建和初始化完成量

struct completion my_completion;
void init_completion(&my_completion);

2、等待完成量

void wait_for_completion(struct completion *);

执行一个非中断的等待,如果调用了wait_for_completion且没有人会完成该任务,则会产生一个不可杀的进程。

3、唤醒完成量

void complete(struct completion *); //唤醒一个等待进程
void complete_all(struct completion *); // 唤醒所有等待进程

一个completion通常是个单次(one-shot)设备,它只会被使用一次,然后被丢弃。如果没有使用completion_all,则我们可以重复使用一个completion结构,但是,如果使用了completion_all,则必须在重复使用该结构体前重新初始化它。

下面这个宏用来快速执行重新初始化:

INIT_COMPLETION(struct completion c);


八,自旋锁(spinlock)

        防止多处理器并发而引入的一种锁,它在内核中大量应用于中断处理等部分

        一个自旋锁是一个互斥设备,它只有两个值:“锁定”和“解锁”。它通常实现为某个整数值中的单个位。希望获得某特定锁的代码测试相关的位。如果锁可用,则“锁定”位被设置,而代码继续进入临界区。相反,如果锁被其他人获得,则代码进入忙循环并重复检查这个锁,直到该锁可用为止,这个循环就是自旋锁的“自旋”部分。

         自旋锁和互斥锁区别

              互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。

              自旋锁,不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,"自旋"一词就是因此而得名。

适用于自旋锁的核心规则是:

            1>任何拥有自旋锁的代码都必须是原子的。它不能休眠,事实上,它不能因为任何原因放弃处理器,除了服务中断以外(某些情况下也不能放弃CPU,如果在中断服务例程中,也需要该自旋锁,则会发生“死锁”,因此,在拥有自旋锁时会禁止本地CPU的中断)。任何时候,只要内核代码拥有自旋锁,在相关处理器上的抢占就会被禁止。当我们编写在自旋锁下执行的代码时,必须注意每一个所调用的函数,他们不能休眠。

            2>自旋锁必须在可能的最短时间内拥有。拥有自旋锁的时间越长,其他处理器不得不自旋的时间就越长,而它不得不自旋的可能性就越大。

          自旋锁API
             要使用自旋锁原语,需要包含头文件<linux/spinlock.h>。
             spinlock_t my_lock = SPIN_LOCK_UNLOCKED;/* 编译时初始化spinlock*/
             void spin_lock_init(spinlock_t *lock);/* 运行时初始化spinlock*/
 
             /* 所有spinlock等待本质上是不可中断的,一旦调用spin_lock,在获得锁之前一直处于自旋状态*/
            void spin_lock(spinlock_t *lock);/* 获得spinlock*/
            void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);// 获得spinlock,禁止本地cpu中断,保存中断标志于flags
            void spin_lock_irq(spinlock_t *lock);/* 获得spinlock,禁止本地cpu中断*/
            void spin_lock_bh(spinlock_t *lock)/* 获得spinlock,禁止软件中断,保持硬件中断打开*/
  
            /* 以下是对应的锁释放函数*/
            void spin_unlock(spinlock_t *lock);
            void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
            void spin_unlock_irq(spinlock_t *lock);
            void spin_unlock_bh(spinlock_t *lock);
 
            /* 以下非阻塞自旋锁函数,成功获得,返回非零值;否则返回零*/
           int spin_trylock(spinlock_t *lock);
          int spin_trylock_bh(spinlock_t *lock);
 
          新内核的<linux/spinlock.h>包含了更多函数

 
       读取者/写入者自旋锁
          允许任意数量的读取者进入临界区,但写入者必须互斥访问。读取者/写入者具有rwlock_t类型,在<linux/spinkock.h>中定义。

  读取者/写入者自旋锁API

       rwlock_t    my_rwlock = RW_LOCK_UNLOCKED;/* 编译时初始化*/

       rwlock_t    my_rwlock; 

       rwlock_init(&my_rwlock); /* 运行时初始化*/ 

      void    read_lock(rwlock_t *lock); 

      void    read_lock_irqsave(rwlock_t *lock, unsigned long flags);

      void    read_lock_irq(rwlock_t *lock);

      void     read_lock_bh(rwlock_t *lock);

      void    read_unlock(rwlock_t *lock);

      void    read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);

      void    read_unlock_irq(rwlock_t *lock);

      void    read_unlock_bh(rwlock_t *lock); /* 新内核已经有了read_trylock */

      void    write_lock(rwlock_t *lock);

      void    write_lock_irqsave(rwlock_t *lock, unsigned long flags);

      void    write_lock_irq(rwlock_t *lock);

      void    write_lock_bh(rwlock_t *lock);

      int       write_trylock(rwlock_t *lock);

     void    write_unlock(rwlock_t *lock);

     void    write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);

      void   write_unlock_irq(rwlock_t *lock);

      void   write_unlock_bh(rwlock_t *lock); /*新内核的<linux/spinlock.h>包含了更多函数*/
 

      在Linux使用读-写自旋锁时,这种锁机制照顾照顾读比写要多一点。当读锁被持有时,写操作为了互斥访问只能等待,但是读者可以继续成功占有锁。而自旋等待的写者在所有读者释放锁之前是无法获得锁的。所以,大量读者必然使挂起的写者处于饥饿状态。

      如果加锁时间不长且代码不会睡眠(比如中断处理程序),利用自旋锁是最佳选择。如果加锁时间可能很长或者代码在持有锁时有可能睡眠,那么最好使用信号量来完成加锁功能。

九,陷阱锁

       锁定模式的设置必须在一开始就要设置好,否则其后的改进会非常困难。
       信号量和自旋锁是不可递归的。在scull中,我们的设计规则是:由系统调用直接调用的那些函数均要获得信号量,以便保护要访问的设备结构。而其他的内部函数,只会由其他的scull函数调用,则假定信号量已被正确获取。
        锁的顺序规则
             必须获取多个锁时,应该始终以相同的顺序获得。
        有帮助的两个规则是:
           1、 如果必须要获得一个局部锁(比如一个设备锁),以及一个属于内核更中心位置的锁,则应该首先获取自己的局部锁。
           2、 如果我们拥有自旋锁和信号量的组合,则必须首先获得信号量。

细粒度和粗粒度的对比

        设备驱动程序中的锁通常相对直接,可以用单个锁来处理所有的事情,或者可以为每个设备建立一个锁。作为通常规则,我们应该在最初使用粗粒度的锁。

除了锁之外的办法

        在某些情况下,原子的访问不需要完整的锁。

免锁算法

         经常用于免锁的生产者/消费者任务的数据结构之一是循环缓冲区(circular buffer)。循环缓冲区的使用在设备驱动程序中相当普遍。特别是网络适配器,经常使用循环冲区和处理器交换数据。在Linux内核中,有一个通用的循环缓冲区实现,有关其使用可参阅<linux/kfifo.h>。

原子变量

       有时,共享的资源可能恰好是一个简单的整数,完整的锁机制对一个简单的整数来讲显得有些浪费。针对这种情况,内核提供了一种原子的整数类型,称为atomic_t,定义在<ams/atomic.h>中。

一个atomic_t变量在所有内核支持的架构上保存了一个int值。但是,由于某些处理器上这种数据类型的工作方式有些限制,因此不能使用完整的整数范围,也就是说,在atomic_t变量中不能记录大于24位的整数。原子操作速度非常快,因为只要可能,它们就会被编译成单个机器指令。

原子变量操作函数:

 void atomic_set(atomic_t *v, int i); /*设置原子变量 v 为整数值 i.*/
 atomic_t v = ATOMIC_INIT(0);  /*编译时使用宏定义 ATOMIC_INIT 初始化原子值.*/
 
  int atomic_read(atomic_t *v); /*返回 v 的当前值.*/
  
  void atomic_add(int i, atomic_t *v);/*由 v 指向的原子变量加 i. 返回值是 void*/
  void atomic_sub(int i, atomic_t *v); /*从 *v 减去 i.*/
  
  void atomic_inc(atomic_t *v); 
void atomic_dec(atomic_t *v); /*递增或递减一个原子变量.*/
 
 int atomic_inc_and_test(atomic_t *v); 
 int atomic_dec_and_test(atomic_t *v); 
 int atomic_sub_and_test(int i, atomic_t *v); 
 /*进行一个特定的操作并且测试结果; 如果, 在操作后, 原子值是 0, 那么返回值是真; 否则, 它是假. 注意没有 atomic_add_and_test.*/
 
 int atomic_add_negative(int i, atomic_t *v); 
 /*加整数变量 i 到 v. 如果结果是负值返回值是真, 否则为假.*/

 
 int atomic_add_return(int i, atomic_t *v); 
 int atomic_sub_return(int i, atomic_t *v); 
 int atomic_inc_return(atomic_t *v); 
 int atomic_dec_return(atomic_t *v); 
 /*像 atomic_add 和其类似函数, 除了它们返回原子变量的新值给调用者.*/

atomic_t类型数据必须只能通过上面的函数来访问。如果将原子变量传递给了需要整型参数的函数,则会遇到编译错误。只有原子变量的数目是原子的,atomic_t变量才能正常工作,需要多个atomic_t变量的操作,仍然需要某种类型的锁。

原子位操作

为了实现位操作,内核提供了一组可原子地修改和测试单个位的函数。

原子位操作非常快,只要底层硬件允许,这种操作就可以使用单个机器指令来执行,并且不需要禁止中断。这些函数依赖于具体的架构,因此在<asm/bitops.h>中声明。即使是在SMP计算机上,这些函数也可确保为原子的,因此,能提供跨处理器的一致性。

这些函数使用的数据类型也是依赖于具体架构的。nr参数(用来描述要操作的位)通常被定义为int,但在少数架构上被定义为unsigned long。要修改的地址通常是指向unsigned long指针,但在某些架构上却使用void *来代替。

可用的位操作如下:

 void set_bit(nr, void *addr); /*设置第 nr 位在 addr 指向的数据项中。*/ 

 void clear_bit(nr, void *addr); /*清除指定位在 addr 处的无符号长型数据.*/ 

 void change_bit(nr, void *addr);/*翻转nr位.*/ 

test_bit(nr, void *addr); /*这个函数是唯一一个不需要是原子的位操作; 它简单地返回这个位的当前值.*/ 

 /*以下原子操作如同前面列出的, 除了它们还返回这个位以前的值.*/

 int test_and_set_bit(nr, void *addr);

 int test_and_clear_bit(nr, void *addr); 

 int test_and_change_bit(nr, void *addr); 
 

十,seqlock

      2.6内核包含了一对新机制打算来提供快速地,无锁地存取一个共享资源。seqlock要保护的资源小,简单,并且常常被存取,并且很少写存取但是必须要快。seqlock 通常不能用在保护包含指针的数据结构。seqlock 定义在<linux/seqlock.h> 。

/*两种初始化方法*/
 seqlock_t lock1 = SEQLOCK_UNLOCKED;
 seqlock_t lock2;
 seqlock_init(&lock2);

这个类型的锁常常用在保护某种简单计算,读存取通过在进入临界区入口获取一个(无符号的)整数序列来工作。在退出时, 那个序列值与当前值比较; 如果不匹配, 读存取必须重试。读者代码形式:

 unsigned int seq;

 do {

    seq = read_seqbegin(&the_lock);

     /* Do what you need to do */

   } while read_seqretry(&the_lock, seq);
 

  如果你的 seqlock可能从一个中断处理里存取,你应当使用IRQ安全的版本来代替:

 unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags);
 int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);

写者必须获取一个排他锁来进入由一个seqlock保护的临界区,写锁由一个自旋锁实现,调用:

  void write_seqlock(seqlock_t *lock); 

 void write_sequnlock(seqlock_t *lock);
 
因为自旋锁用来控制写存取, 所有通常的变体都可用:

 void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
 void write_seqlock_irq(seqlock_t *lock);
 void write_seqlock_bh(seqlock_t *lock);

 void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
 void write_sequnlock_irq(seqlock_t *lock);
 void write_sequnlock_bh(seqlock_t *lock);

还有一个write_tryseqlock在它能够获得锁时返回非零。

读取-复制-更新

   读取-拷贝-更新(RCU) 是一个高级的互斥方法, 在合适的情况下能够有高效率。它在驱动中的使用很少。使用RCU的代码须包含<linux/rcupdate.h>。

 

抱歉!评论已关闭.