现在的位置: 首页 > 综合 > 正文

linux内核学习(19)内核编程基本功之内核同步与互斥锁mutex

2013年09月18日 ⁄ 综合 ⁄ 共 7107字 ⁄ 字号 评论关闭

Pro-II、内核同步与互斥锁:

1、理解互斥锁?

互斥锁的使用也是保持内核临界区的同步的,互斥锁可以说源于信号量,信号量设置计数器可以容许n个进程并发的访问临界区,而互斥锁不行,只能容许每次一个进程访问,也就是计数器值为1的信号量,可以这么理解。互斥锁和自旋锁有不同(显然的),互斥锁在中断处理程序中和可延迟函数中都不能使用,因为它是可以睡眠的,只能在进程上下文或者软中断上下文才能使用。


2、如何实现互斥锁?


2-1、结构体:


2-1-1、mutex结构体:

struct mutex {

/* 1: unlocked, 0: locked, negative: locked, possible waiters */

atomic_t count;

spinlock_t wait_lock;

struct list_head wait_list;

//我们关心上面三个变量

#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_SMP)

struct thread_info *owner;

#endif

#ifdef CONFIG_DEBUG_MUTEXES

const char *name;

void *magic;

#endif

#ifdef CONFIG_DEBUG_LOCK_ALLOC

struct lockdep_map dep_map;

#endif

};


2-1-2、mutex_waiter结构体:

struct mutex_waiter {

struct list_head list;

struct task_struct *task;

#ifdef CONFIG_DEBUG_MUTEXES

void *magic;

#endif

};


2-2、方法:


2-2-1、初始化:


# define mutex_init(mutex) /

do { /

static struct lock_class_key __key; /

/

__mutex_init((mutex), #mutex, &__key); /

} while (0)


void

__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)

{

atomic_set(&lock->count, 1); //设置计数器

spin_lock_init(&lock->wait_lock); //初始化自旋锁

INIT_LIST_HEAD(&lock->wait_list); //初始化等待队列

mutex_clear_owner(lock);


debug_mutex_init(lock, name, key);

}


2-2-2、锁住:


void __sched mutex_lock(struct mutex *lock)

{

might_sleep();

/*

* The locking fastpath is the 1->0 transition from

* 'unlocked' into 'locked' state.

*/

__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);

mutex_set_owner(lock);

}


#define __mutex_fastpath_lock(count, fail_fn) /

do { /

unsigned int dummy; /

/

typecheck(atomic_t *, count); /

typecheck_fn(void (*)(atomic_t *), fail_fn); /

/

asm volatile(LOCK_PREFIX " decl (%%eax)/n" /

" jns 1f /n" /

//如果count>=0,即之前的count>0(其实也只有一种情况的count==1)

//说明资源空闲着,可以直接使用

" call " #fail_fn "/n" /

//如果之前的count<=0,说明资源忙或者已经有进程在等待

//于是我们要睡眠,也就是将本进程加入该互斥锁等待队列中

//调用的是fail_fn这个函数

"1:/n" /

: "=a" (dummy) /

: "a" (count) /

: "memory", "ecx", "edx"); /

} while (0)


static __used noinline void __sched

__mutex_lock_slowpath(atomic_t *lock_count)

{

struct mutex *lock = container_of(lock_count, struct mutex, count);


__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0, _RET_IP_);

}


static inline int __sched

__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,

unsigned long ip)

{

struct task_struct *task = current;

struct mutex_waiter waiter;

unsigned long flags;


preempt_disable(); //禁止内核抢占

mutex_acquire(&lock->dep_map, subclass, 0, ip);

//忽略下面的定义

#ifdef CONFIG_MUTEX_SPIN_ON_OWNER

/*

* Optimistic spinning.

*

* We try to spin for acquisition when we find that there are no

* pending waiters and the lock owner is currently running on a

* (different) CPU.

*

* The rationale is that if the lock owner is running, it is likely to

* release the lock soon.

*

* Since this needs the lock owner, and this mutex implementation

* doesn't track the owner atomically in the lock field, we need to

* track it non-atomically.

*

* We can't do this for DEBUG_MUTEXES because that relies on wait_lock

* to serialize everything.

*/


for (;;) {

struct thread_info *owner;


/*

* If we own the BKL, then don't spin. The owner of

* the mutex might be waiting on us to release the BKL.

*/

if (unlikely(current->lock_depth >= 0))

break;


/*

* If there's an owner, wait for it to either

* release the lock or go to sleep.

*/

owner = ACCESS_ONCE(lock->owner);

if (owner && !mutex_spin_on_owner(lock, owner))

break;


if (atomic_cmpxchg(&lock->count, 1, 0) == 1) {

lock_acquired(&lock->dep_map, ip);

mutex_set_owner(lock);

preempt_enable();

return 0;

}


/*

* When there's no owner, we might have preempted between the

* owner acquiring the lock and setting the owner field. If

* we're an RT task that will live-lock because we won't let

* the owner complete.

*/

if (!owner && (need_resched() || rt_task(task)))

break;


/*

* The cpu_relax() call is a compiler barrier which forces

* everything in this loop to be re-loaded. We don't need

* memory barriers as we'll eventually observe the right

* values at the cost of a few extra spins.

*/

cpu_relax();

}

#endif

spin_lock_mutex(&lock->wait_lock, flags); //自旋锁一下


debug_mutex_lock_common(lock, &waiter);

debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));


/* add waiting tasks to the end of the waitqueue (FIFO): */

list_add_tail(&waiter.list, &lock->wait_list);

//将本进程放入互斥锁FIFO队列中

waiter.task = task;


if (atomic_xchg(&lock->count, -1) == 1)

goto done;

//有可能在调用这个函数的途中互斥锁释放了,在进入循环前检查一遍

//如果果真释放了,就直接进入临界区

lock_contended(&lock->dep_map, ip);


for (;;) {

/*

* Lets try to take the lock again - this is needed even if

* we get here for the first time (shortly after failing to

* acquire the lock), to make sure that we get a wakeup once

* it's unlocked. Later on, if we sleep, this is the

* operation that gives us the lock. We xchg it to -1, so

* that when we release the lock, we properly wake up the

* other waiters:

*/

if (atomic_xchg(&lock->count, -1) == 1)

break;

//如果发现互斥锁被释放了,则终止循环


/*

* got a signal? (This code gets eliminated in the

* TASK_UNINTERRUPTIBLE case.)

*/

if (unlikely(signal_pending_state(state, task))) {

mutex_remove_waiter(lock, &waiter,

task_thread_info(task));

mutex_release(&lock->dep_map, 1, ip);

spin_unlock_mutex(&lock->wait_lock, flags);


debug_mutex_free_waiter(&waiter);

preempt_enable();

return -EINTR;

}

//执行的可能行极小

__set_task_state(task, state);

//设置本进程状态为不可中断(TASK_UNINTERRUPTIBLE)

/* didnt get the lock, go to sleep: */

spin_unlock_mutex(&lock->wait_lock, flags);

//释放自旋锁

preempt_enable_no_resched();

schedule();

//现在切换进程,进入睡眠阶段

//一旦被唤醒

preempt_disable();

spin_lock_mutex(&lock->wait_lock, flags);

}


done:

lock_acquired(&lock->dep_map, ip);

/* got the lock - rejoice! */

mutex_remove_waiter(lock, &waiter, current_thread_info());

//将本进程从互斥锁FIFO队列中移除

mutex_set_owner(lock);


/* set it to 0 if there are no waiters left: */

if (likely(list_empty(&lock->wait_list)))

atomic_set(&lock->count, 0);

//注意,如果互斥锁等待队列中没有等待的进程,则将count置0

//表示资源正忙,但是没有进程等待

//这里解释一下count的取值

//1:表示资源空闲,0:表示资源忙,但没有等待资源的进程,-1:表示资源忙,且有等待资源的进程


spin_unlock_mutex(&lock->wait_lock, flags);


debug_mutex_free_waiter(&waiter);

preempt_enable();

//恢复一切,正常执行代码

return 0;

}


2-2-3、释放锁:


void __sched mutex_unlock(struct mutex *lock)

{

/*

* The unlocking fastpath is the 0->1 transition from 'locked'

* into 'unlocked' state:

*/

#ifndef CONFIG_DEBUG_MUTEXES

/*

* When debugging is enabled we must not clear the owner before time,

* the slow path will always be taken, and that clears the owner field

* after verifying that it was indeed current.

*/

mutex_clear_owner(lock);

#endif

__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);

}


#define __mutex_fastpath_unlock(count, fail_fn) /

do { /

unsigned int dummy; /

/

typecheck(atomic_t *, count); /

typecheck_fn(void (*)(atomic_t *), fail_fn); /

/

asm volatile(LOCK_PREFIX " incl (%%eax)/n" /

" jg 1f/n" /

//如果发现count>0,即之前的count>=0,说明没有等待该资源的进程

//那么无需去唤醒另外的进程

" call " #fail_fn "/n" /

//否则唤醒另外的进程

"1:/n" /

: "=a" (dummy) /

: "a" (count) /

: "memory", "ecx", "edx"); /

} while (0)


static __used noinline void

__mutex_unlock_slowpath(atomic_t *lock_count)

{

__mutex_unlock_common_slowpath(lock_count, 1);

}


static inline void

__mutex_unlock_common_slowpath(atomic_t *lock_count, int nested)

{

struct mutex *lock = container_of(lock_count, struct mutex, count);

unsigned long flags;


spin_lock_mutex(&lock->wait_lock, flags);

mutex_release(&lock->dep_map, nested, _RET_IP_);

debug_mutex_unlock(lock);


/*

* some architectures leave the lock unlocked in the fastpath failure

* case, others need to leave it locked. In the later case we have to

* unlock it here

*/

if (__mutex_slowpath_needs_to_unlock())

atomic_set(&lock->count, 1);

//这里在唤醒的时候将count置1,表示资源空闲

//这里看似与count值的说法有错,因为现在可能还有别的进程等待该资源

//但是当唤醒到另外的进程中会看到,结合mutex_lock中 atomic_xchg函数

//而解锁函数如何知道还有没有等待的进程,判断等待队列是否为空就搞定了


if (!list_empty(&lock->wait_list)) {

/* get the first entry from the wait-list: */

struct mutex_waiter *waiter =

list_entry(lock->wait_list.next,

struct mutex_waiter, list);

//找到队列中排在前面的那个进程

debug_mutex_wake_waiter(lock, waiter);


wake_up_process(waiter->task); //唤醒那个进程

}


spin_unlock_mutex(&lock->wait_lock, flags);

}


#define __mutex_slowpath_needs_to_unlock() 1


互斥锁是比较难理解的,希望多花点时间,多看点资料,结合一些具体情况。

抱歉!评论已关闭.