0%

实验名称: Lab6 并发与锁机制

学生姓名: 庄云皓

学生学号: 22336327

1. 实验要求

Assignment 1 代码复现题

1.1 代码复现

在本章中,我们已经实现了自旋锁和信号量机制。现在,同学们需要复现教程中的自旋锁和信号量的实现方法,分别使用二者解决一个同步互斥问题,如消失的芝士汉堡问题。最后,将结果截图并说说你是怎么做的。

1.2 锁机制的实现

我们使用了原子指令 xchg来实现自旋锁。但是,这种方法并不是唯一的。例如,x86指令中提供了另外一个原子指令 btslock前缀等,这些指令也可以用来实现锁机制。现在,同学们需要结合自己所学的知识,实现一个与本教程的实现方式不完全相同的锁机制。最后,测试你实现的锁机制,将结果截图并说说你是怎么做的。

提示:在 asm_utils.asm中实现你自己的原子操作 your_asm_atomic_exchange,并在 sync.cpp中做相应修改。

Assignment 2 生产者-消费者问题

2.1 Race Condition

同学们可以任取一个生产者-消费者问题,然后在lab6的代码环境下创建多个线程来模拟这个问题。在2.1中,我们不使用任何同步互斥的工具。因此,这些线程可能会产生冲突,进而无法产生我们预期的结果。同学们需要将这个产生错误的场景呈现出来,将结果截图并说说你是怎么做的。

2.2 信号量解决方法

使用信号量解决上述你模拟的生产者-消费者问题。将结果截图并说说你是怎么做的。

提示:

①经典的生产者-消费者问题有读者-写者问题、有界缓冲区问题等,可任取一个来模拟。

②模拟生产者-消费者问题的步骤:1、在代码中创建多个线程,分别代表生产者和消费者。2、编写生产者和消费者的线程函数。在这些函数中,根据问题的具体场景实现生产数据和消费数据的逻辑。3、创建并启动生产者和消费者线程。观察线程之间的交互并记录结果。4、展示没有使用同步互斥工具(如信号量)时,线程之间可能产生的冲突。5、使用信号量解决同步问题,并展示解决后的结果。

③样例视频模拟了读者-写者问题的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//模拟读错误
//创建线程读第1-4条记录
programManager.executeThread(readFirstQuote, nullptr, "second thread", 1);
programManager.executeThread(readSecondQuote, nullptr, "third thread", 1);
programManager.executeThread(readThirdQuote, nullptr, "fourth thread", 1);
programManager.executeThread(readFourthQuote, nullptr, "fifth thread", 1);
//创建线程,修改第2条和第4条记录为较长内容
//由于写时间较长,写线程运行时间大于RRschedule的time quantum
programManager.executeThread(writeSecondQuote, nullptr, "sixth thread", 1);
programManager.executeThread(writeFourthQuote, nullptr, "seventh thread", 1);
//创建线程读第2条和第4条记录
//发现没有读到修改后的项,而是输出了初始项
programManager.executeThread(readSecondQuote, nullptr, "eighth thread", 1);
programManager.executeThread(readFourthQuote, nullptr, "ninth thread", 1);

而使用信号量后,可以读到修改后的项。

Assignment 3 哲学家就餐问题

假设有 5 个哲学家,他们的生活只是思考和吃饭。这些哲学家共用一个圆桌,每位都有一把椅子。在桌子中央有一碗米饭,在桌子上放着 5 根筷子。

哲学家就餐问题

当一位哲学家思考时,他与其他同事不交流。时而,他会感到饥饿,并试图拿起与他相近的两根筷子(筷子在他和他的左或右邻居之间)。一个哲学家一次只能拿起一根筷子。显然,他不能从其他哲学家手里拿走筷子。当一个饥饿的哲学家同时拥有两根筷子时,他就能吃。在吃完后,他会放下两根筷子,并开始思考。

3.1 初步解决方法

同学们需要在本教程的代码环境下,创建多个线程来模拟哲学家就餐的场景。然后,同学们需要结合信号量来实现理论课教材中给出的关于哲学家就餐问题的方法。最后,将结果截图并说说你是怎么做的。

该方案可能导致死锁,请举例出现死锁的场景和原因,并提出一种解决死锁的方案。

3.2 死锁解决方法(选做)

虽然3.1的解决方案保证两个邻居不能同时进食,但是它可能导致死锁。现在,同学们需要想办法将死锁的场景演示出来。提出一种解决死锁的方法并实现。最后,将结果截图并说说你是怎么做的。

说明:

①为演示死锁场景,可以在哲学家进食的线程中添加等待时间,使哲学家们的操作更接近于同时进行。

②样例视频演示了等待时间较少——正常、等待时间较长——出现死锁。

2. 实验过程

Assignment1

1.1.1自旋锁实现方法

spinlock中private变量是两个线程有公用的变量bolt,初始化为0

  • 局部变量key初始化为1
  • 当第一个线程请求进入临界区时,将bolt和key交换,bolt为1,key为0,跳出循环进入临界区。
  • 此时其他线程如果想要进入临界区,它的key初始化为1,bolt为1,交换后key仍然为1,循环等待(自旋)

交换的过程见关键代码部分。

观察是否解决了消失的芝士汉堡问题

1.1.1信号量实现方法

在include/sync.h下定义信号量Semaphore类。类中变量couter.表示已有资源数

  • P():只要counter>0则把counter-=1;如果county已经=0则把当前线程放进等待队列,将线程状态置为blockecd
  • V():counter++,获取等待队列队首元素并唤醒。

在在first_thread里将semaphore 初始化为1,然后在调用 mother函数在mycheese_burger+=10 前P,在print汉堡剩余量之后V防止在这段时间内汉堡被偷吃。

1.2 锁机制的实现

bts指令(bit test and set)作用就是设置内容中的一个位为1, 会首先判断该位是否已经为1,是就返回1,不是则置为1,然后返回0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
my_spinlock_lock:
push ebp
mov ebp, esp
pushad
.loop:
mov ebx, [ebp + 4 * 2]
mov edx, 0
lock bts dword [ebx], edx;test and set 第零位;若为0则将其置1,并将原来的值保存进CF,
;第零位若为1则 CF = 1

jc .loop ;CF为1则跳转

popad
pop ebp
ret

观察这段上锁的函数,我们 .loop循环中,[ebx]的值是传进函数的参数,即 bolt变量。bts语句的作用是首先判断 bolt是否为0,如果是的话就将其改为1,将CF寄存器置0,如果是1则bolt不变,CF = 1

这段代码能实现互斥访问的原因是:初始化bolt为0,如果一个进程在进入临界区时会将bolt改为1,如果此时被另一个进程抢占的话,bolt就仍然为1,循环等待。直到原进程调用 unlock函数将bolt置为0,另一个进程才能跳出.loop循环,进入临界区域。bts指令保证了操作的原子性。

Assignment2

Assignment 2.1

一个错误场景:

观察下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void producer(void* arg){
while(true){
if(cnt < MAX_BUFFER_SIZE){
int delay = 0xffffff;
while(delay){
delay--;
}
buffer[cnt] = 1;
cnt++;
printf("produced an item, cnt = %d\n",cnt);
if(cnt > MAX_BUFFER_SIZE){
printf("overflow\n");
}
}
}
}

没有实现PV操作时的错误场景:

可以看到当前buffer数组中的元素已经有MAX_SIZE-1个时,一个线程判断if(cnt<MAX_SIZE)符合条件进入,在cnt++之前被调度下处理器。

另外一个线程调度上处理器,同样满足if(cnt<MAX_SIZE)的条件,进入临界区域,cnt++。

之前被调度下来的进程重新进入running状态时cnt也++,导致cnt>MAX_SIZE,出现溢出。

underflow的情况以此类推。

Assignment 2.2

信号量解决方案:

三个信号量

  • empty_slots:初始化值为MAX_SIZE
  • full_slots: 初始化值为0
  • mutex:初始化值为1

每次进入临界区之前,empty_slots.P()使empty_slots.counter–, mutex.P()保证互斥访问。如果不加 mutex.P()的话,可能出现的情况是:MAX_SIZE大于等于2时,那么就可能有多个线程进入product的核心代码段,产生常见的资源共享问题。

从临界区出来时,要把full_slots++,mutex解锁。

这里要注意的是:empty_slots.P()mutex.P()不能交换顺序,这是因为,如果交换顺序,buffer为满时会死锁,因为当一个线程进入了临界区,mutex上锁,empty_slots->counter=MAX_SIZE-1,循环等待,此时切换到另一个线程,因为已经上锁的缘故无法进行生产的.

同理 full_slots.P()mutex.P()也不能交换顺序

1
2
3
4
5
6
empty_slots.P();
mutex.P();
buffer[cnt] = 1;
cnt++;
mutex.V();
full_slots.V();

Assignment 3

3.1

五个线程模拟五个哲学家,设置容量为5的信号量数组模拟筷子。对哲学家按顺序从 0~4编号,哲学家i左边的筷子的编号为i,哲学家右边的筷子的编号为 (i+l)%5

算法存在以下问题:当五个哲学家都想要进餐,分别拿起他们左边筷子的时候(都恰好执行完 chopstick[i].wait();)筷子已经被拿光了,等到他们再想拿右边的筷子的时候(执行 wait(chopstick[(i+l)%5].wait())就全被阻塞了,这就出现了死锁。

3.2

  • solution1

一共有五个哲学家,只允许四位哲学家同时处于拿起筷子的状态,这样就不会出现有五个哲学家全都拿起左边的筷子导致死锁或者都拿起右边的筷子的情形。

需要设置一个信号量num,初始化为4,在拿起筷子前num.P(),放下筷子之后num.V()

  • solution2

解决这个问题有个办法是在拿起筷子前先判断左右两个筷子是否可用,可用才能拿,而且是同时拿,这样不相邻的哲学家就可以吃上饭,不会造成死锁。

实现这个思路又有两种具体的策略

    • i 条件 (chopsticks[lhs].getcounter() &&chopsticks[rhs].getcounter())成立才把左边和右边的筷子进行P操作。
    • ii采用另一个信号量 mutex进行控制,保证拿起左右手的筷子这一操作变为原子的,不能多个线程同时进行拿起左右手筷子的操作

3. 关键代码

Assignment 1

1.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
; void asm_atomic_exchange(uint32 *register, uint32 *memeory);
asm_atomic_exchange:
push ebp
mov ebp, esp
pushad

mov ebx, [ebp + 4 * 2] ; register
mov eax, [ebx] ; 取出register指向的变量的值
mov ebx, [ebp + 4 * 3] ; memory
xchg [ebx], eax ; 原子交换指令
mov ebx, [ebp + 4 * 2] ; memory
mov [ebx], eax ; 将memory指向的值赋值给register指向的变量

popad
pop ebp
ret

信号量解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Semaphore semaphore;

int cheese_burger;

void a_mother(void *arg)
{
semaphore.P();
int delay = 0;

printf("mother: start to make cheese burger, there are %d cheese burger now\n", cheese_burger);
// make 10 cheese_burger
cheese_burger += 10;

printf("mother: oh, I have to hang clothes out.\n");
// hanging clothes out
delay = 0xfffffff;
while (delay)
--delay;
// done

printf("mother: Oh, Jesus! There are %d cheese burgers\n", cheese_burger);
semaphore.V();
}

void a_naughty_boy(void *arg)
{
semaphore.P();
printf("boy : Look what I found!\n");
// eat all cheese_burgers out secretly
cheese_burger -= 10;
// run away as fast as possible
semaphore.V();
}

1.2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
my_spinlock_lock:
push ebp
mov ebp, esp
pushad
.loop:
mov ebx, [ebp + 4 * 2]
mov eax, 1
mov edx, 0
lock bts dword [ebx], edx;test and set 第零位;若为0则将其置1,并将原来的值保存进CF,
;第零位若为1则 CF = 1

jc .loop ;CF为1则跳转

popad
pop ebp
ret

Assignment2

2.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# define MAX_BUFFER_SIZE 1
int buffer[MAX_BUFFER_SIZE];
int cnt = 0;
void producer(void* arg){
while(true){
if(cnt < MAX_BUFFER_SIZE){


int delay = 0xffffff;
while(delay){
delay--;
}
buffer[cnt] = 1;
cnt++;
printf("produced an item, cnt = %d\n",cnt);
if(cnt > MAX_BUFFER_SIZE){
printf("overflow\n");
}
}
}
}
void consumer(void* arg){
while(true){
if(cnt > 0){

int delay = 0xffffff;
while(delay){
delay--;
}

cnt--;
printf("consumerd an item, cnt = %d\n",cnt);
if(cnt<0){
printf("underflow\n");
}
buffer[cnt] = 0;
}
}
}

2.2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# define MAX_BUFFER_SIZE 1
int buffer[MAX_BUFFER_SIZE];
int cnt = 0;
void producer(void* arg){
while(true){
//if(cnt < MAX_BUFFER_SIZE){
int delay = 0xffffff;
while(delay){
delay--;
}
empty_slots.P();
mutex.P();
buffer[cnt] = 1;
cnt++;
mutex.V();
full_slots.V();
printf("produced an item, cnt = %d\n",cnt);
if(cnt > MAX_BUFFER_SIZE){
printf("overflow\n");
}
// }
}
}
void consumer(void* arg){
while(true){
//if(cnt > 0){
full_slots.P();// 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前
mutex.P(); // 保证在product时不会有其他线程访问缓冲区
int delay = 0xffffff;
while(delay){
delay--;
}
cnt--;
buffer[cnt] = 0;
mutex.V();
empty_slots.V(); // 通知consumer缓冲区有资源可以取走
printf("consumerd an item, cnt = %d\n",cnt);
if(cnt<0){
printf("underflow\n");
}
//}
}
}

Assignment3

3.2-solution1

num初始化为4,在拿起筷子前num.P(),放下筷子之后num.V()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Semaphore eaters;
int cnt = 0;
void philosopher(int id){
while(true){
int delay =0xffffff;
num.P();
printf("philosopher%d is hungry ",id);

chopsticks[id].P();
while(delay>0){
delay--;
}
//printf("philosopher%d picked up chopstick%d\n",id,id);
chopsticks[(id+1)%MAX_SIZE].P();

//printf("philosopher%d picked up chopstick%d\n",id,(id+1)%MAX_SIZE);
printf("philosopher%d is eating\n",id);
chopsticks[(id+1)%MAX_SIZE].V();
//printf("philosopher%d lowered chopstick%d\n",id,id);
chopsticks[id].V();
num.V();
//printf("philosopher%d lowered chopstick%d\n",id,(id+1)%MAX_SIZE);
}
}

3.2-solution2

mutex->counter初始化为1,保证一个哲学家左右两根筷子都会被拿起

1
2
3
4
5
6
7
8
mutex.P();
chopsticks[id].P();
while(delay>0){
delay--;
}
//printf("philosopher%d picked up chopstick%d\n",id,id);
chopsticks[(id+1)%MAX_SIZE].P();
mutex.V();

4. 实验结果

Assignment1 代码复现题

1.1.1使用自旋锁解决消失的芝士汉堡问题

1.1.2使用信号量解决消失的芝士汉堡问题

1714983997122

1.2使用bts实现自旋锁

1716042645677

Assignment2 生产者-消费者问题

2.1Race Condition

1715744966610

2.2采用信号量的解决方案MAX_SIZE = 1时

1716042964914

Assignment3 哲学家就餐问题

出现死锁情况:

1715774700068

避免死锁情况:

1716035988152

5. 总结

在这次实验中通过改变几个经典同步问题,我学习了自旋锁的实现,信号量的使用等基本知识,进一步了解了互斥访问的问题,使用SpinLock和信号量来给出实现线程互斥的解决方案。加深了对理论课知识的理解。