0%

Linux 内核同步机制之 Semaphore

Linux 内核同步机制之 Semaphore

1 Semaphore 的使用

1.1 竞争条件

  • 竞争条件是指多个线程或者进程在读写一个共享数据时结果依赖于它们执行的相对时间的情形
  • 举例:
1
2
3
4
5
6
7
8
9
10
int open_count = 0;

int hello_open(struct inode *p, struct file *f) {
if (open_count) {
return -EBUSY;
}
++open_count;
printk(KERN_INFO "hello_open ok\r\n");
return 0;
}

上面代码的作用是确保同一时间只有1个进程能够访问 hello_dev 驱动设备,但是由于 CPU 的指令调度,可能存在如下的执行顺序,

img

初始情况,open_count 为 0,进程 A 执行到 if 语句,条件为假,不执行分支内部的语句,此时,由于 CPU 调度(可能是因为进程 A 时间片到期或者被更高优先级的进程 B 抢占),进程 B 也执行到 if 判断,条件也为假,不执行分支内部的语句,然后进程 A 和 B 都获得了设备的使用权

1.2 使用 Semaphore 实现互斥

1.2.1 声明

1
struct semaphore sema;
  • 在 linux 内核中,信号量为结构体类型 struct semaphore,在 hello_dev 字符设备驱动中,将信号量声明成 struct semaphore 类型的全局变量 sema

1.2.2 初始化

  • 信号量在使用之前需要初始化,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int hello_init(void) {
devNum = MKDEV(reg_major, reg_minor);
if(OK == register_chrdev_region(devNum, subDevNum, "helloworld")) {
printk(KERN_INFO "register_chrdev_region ok \n");
} else {
printk(KERN_INFO "register_chrdev_region error n");
return ERROR;
}
printk(KERN_INFO " hello driver init \n");
gDev = kzalloc(sizeof(struct cdev), GFP_KERNEL);
gFile = kzalloc(sizeof(struct file_operations), GFP_KERNEL);
gFile->open = hello_open;
gFile->read = hello_read;
gFile->write = hello_write;
gFile->owner = THIS_MODULE;
cdev_init(gDev, gFile);
cdev_add(gDev, devNum, 1);

sema_init(&sema, 1);
return 0;
}
  • 在 hello_dev 字符设备的初始化函数中,调用 sema_init 函数将信号量 sema 初始化为 1,
1
2
3
4
5
6
static inline void sema_init(struct semaphore *sem, int val)
{
static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
}
  • 将信号量初始化为 1,是因为 hello_dev 字符设备需要保证互斥访问,即同一时间只有1个进程或线程能够访问这个设备

1.2.3 使用信号量解决上述的竞争条件问题

1
2
3
4
5
6
7
8
9
10
11
int hello_open(struct inode *p, struct file *f) {
down(&sema);
if (open_count >= 1) {
printk(KERN_INFO "device is busy, hello open failed");
return -EBUSY;
}
++open_count;
up(&sema);
printk(KERN_INFO "hello_open ok\r\n");
return 0;
}
  • 之前的 hello_open 函数代码之所以存在竞争条件问题,是因为打开设备的过程被分为两个步骤:判断使用量、增加使用量,不具有原子性
  • 将判断、增加使用量的代码称为临界区,在进入临界区之前,先调用 down 函数,将信号量 sema 的值减 1;离开临界区时,调用 up 函数,对信号量 sema 的值加 1

1.2.4 原理浅析

1.2.4.1 基本原理

  • 当对某个信号量调用 down 函数,会对信号量的值减 1,如果得到的结果不小于 0,则可以继续执行临界区代码;如果得到的结果小于 0,则会导致进程被阻塞休眠
  • 当对某个信号量调用 up 函数,会对信号量的值加 1,如果得到的结果不大于0,就会从进程阻塞队列中根据某种策略选取 1 个进程唤出,执行临界区代码

1.2.4.2 案例分析

  • 在初始情况下,信号量 sema 被初始化为 1,此时进程 A 调用 down 函数,将信号量的值减 1,得到的结果为 0,可以继续执行临界区代码
  • 在进程 A 调用 up 函数前,进程 B 也想使用同一设备,于是调用 down 函数,对信号量 sema 的值减 1,得到结果为 -1,从而被阻塞
  • 当进程 A 执行完临界区代码后,调用 up 函数,将信号量 sema 的值加 1,得到结果为 0,于是将进程 B 从进程阻塞队列唤出,于是 B 也能执行临界区代码
  • 进程 B 执行完临界区代码后,也调用 up 函数,将信号量 sema 的值加 1,又回到初始状态 1

2 Semaphore 源码浅析

2.1 数据结构

1
2
3
4
5
6
/* Please don't access any members of this structure directly */
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
  • 在 semaphore 结构体内部有 3 个成员变量,分别是:自旋锁 lock、计数器 count、双向链表 wait_list
  • 在 linux 内核中,已经实现了双向链表,类型为 struct list_head,无需再自己实现

2.2 初始化信号量的过程

2.2.1 宏初始化

1
2
3
4
5
6
#define __SEMAPHORE_INITIALIZER(name, n)                \
{ \
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \
.count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}

以宏定义的形式初始化信号量结构体内部的 3 个成员:锁、计数器和双向链表

2.2.2 函数初始化

1
2
3
4
5
6
static inline void sema_init(struct semaphore *sem, int val)
{
static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
}

2.3 获取信号量的过程

2.3.1 down 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* down - acquire the semaphore
* @sem: the semaphore to be acquired
*
* Acquires the semaphore. If no more tasks are allowed to acquire the
* semaphore, calling this function will put the task to sleep until the
* semaphore is released.
*
* Use of this function is deprecated, please use down_interruptible() or
* down_killable() instead.
*/
void down(struct semaphore *sem)
{
unsigned long flags;

raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
__down(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
  • 因为对信号量 sem 内部成员的访问可能是并发的,所以使用自旋锁 sem->lock 确保共享变量的互斥访问
  • 如果此时计数器 sem->count 大于 0,则将其减 1,然后释放自旋锁 sem-> lock,退出 down 函数
  • 如果此时计数器 sem->count 小于或等于 0,则调用 __down 函数

2.3.2 __down 函数

1
2
3
4
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
  • down 函数内部又调用 __down_common 函数
  • 宏 TASK_UNINTERRUPTIBLE 表示进程不可中断的睡眠状态,宏 MAX_SCHEDULE_TIMEOUT 被定义成 LONG_MAX

2.3.3 __down_common 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static inline int __sched __down_common(struct semaphore *sem, long state, long timeout)
{
struct task_struct *task = current;
struct semaphore_waiter waiter;

list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = false;

for (;;) {
if (signal_pending_state(state, task))
goto interrupted; //因中断信号而结束
if (unlikely(timeout <= 0))
goto timed_out; // //因超时而结束
__set_task_state(task, state);
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}

timed_out:
list_del(&waiter.list);
return -ETIME;

interrupted:
list_del(&waiter.list);
return -EINTR;
}
  • 在 linux 内核源码中,task_struct 类型是进程的抽象,current 指针始终指向当前正在执行中的进程,在 __down_common 函数的第一行定义 task_struct 类型指针 task 指向当前进程
  • 将当前进程加入到信号量 sem 的等待队列 wait_list 中,这里用双向链表模拟队列,添加元素到表尾即为入队,从表头取出元素即为出队
  • 进入 for 循环内部,signal_pending_state 函数判断当前是否存在中断信号,如果有就跳出死循环,将进程从阻塞队列中移除,__down_common 函数返回 -EINTR
  • 如果 timeout 小于等于 0,则因为超时跳出死循环,将进程从阻塞队列中移除,__down_common 函数返回 -ETIME
  • schedule_timeout 函数将当前进程置于休眠状态,等待 timeout 时间后继续执行
  • 如果其他进程对这个信号量调用了 up 函数,那么 waiter.up 就置为 1,__down_common 函数返回 0

2.4 释放信号量的过程

2.4.1 up 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* up - release the semaphore
* @sem: the semaphore to release
*
* Release the semaphore. Unlike mutexes, up() may be called from any
* context and even by tasks which have never called down().
*/
void up(struct semaphore *sem)
{
unsigned long flags;

raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
  • 因为对信号量 sem 内部成员的访问可能是并发的,所以使用自旋锁 sem->lock 确保共享变量的互斥访问
  • 如果此时等待队列是空的,则将 count 加 1,然后释放自旋锁 sem-> lock,退出 up 函数
  • 如果此时等待队列不为空,调用 __up 函数

2.4.2 __up 函数

1
2
3
4
5
6
7
8
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);
}
  • 取出信号量 sem 等待队列的第一个进程,调用 wake_up_process 函数唤醒这个处于睡眠状态的进程,使其由睡眠状态变为RUNNING状态,从而能够被CPU重新调度执行