6.S081-Lab 6 Multithreading

前言

本篇博客是 6.S801 Lab系列的第 6 篇博客,这次Lab:Multithreading,是多线程相关的实验。实际上Xv6操作系统中并不支持用户级线程,每个用户进程只运行一个线程,用户进程(线程)之间调度的过程不可能是一个直接的过程,而是通过内核来间接实现的:

  1. 首先进程1 (线程)由于某种原因(定时器中断或系统调用或异常等)陷入内核,此时执行进程 1 对应的内核线程1
  2. 内核线程此时决定让出(yield)CPU,然后调用 sched 并通过 swtch 函数切换到调度器线程,每个CPU独有一个调度器线程,它运行在自己独有的栈中。
  3. 调度器线程通过轮询(Round Robin)找出另一个可以运行(Runnable)的进程2,并通过 swtch 切换到它对应的内核线程2,然后内核线程2从内核中返回用户进程2。这样就完成了两个进程之间的切换。

需要注意的是,这期间每一次线程切换都需要保存相应的寄存器,其中从用户线程陷入内核转换为内核线程时,需要将所有寄存器保存至进程的 trapframe 页面中,这一点之前 trap 实验时就已经体会过;在内核线程和调度器线程之间切换的时候,也需要保存相应的上下文(context),这个上下文包括所有的 callee saved 寄存器(caller saved 寄存器不需要保存,因为调用swtch函数前会被维护在运行时栈中),以及 ra 寄存器以及 sp 寄存器。详细见代码,还是非常tricky的。

Uthread: switching between threads

就是要求实现一个简单的用户级线程的功能,框架已经写好了,只需要填一些关键的代码就行,原理和内核中的原理基本是一致的,需要保存相应的寄存器。

首先定义用户线程的上下文,并将其添加到线程结构中,在user/uthread.c中添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct ucontext
{
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};

struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct ucontext context; // user thread context
};

我们要让每个线程在自己的栈上运行,所以需要在线程创建的时候将其上下文的 sp 指针设置到自己的栈,为了使调度器第一次切换到线程的时候能够运行相应的线程函数,所以还需要将返回地址寄存器 ra 设置到函数地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
void 
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->context.ra = (uint64)func;
t->context.sp = (uint64)t->stack + STACK_SIZE;
}

user/uthread_switch.S中填写和kernel/swtch.S相同的代码,以便在线程切换的时候,保存线程1的所有寄存器至其上下文中,并恢复线程2上下文中的所有寄存器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
thread_switch:
/* YOUR CODE HERE */
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)

ret /* return to ra */

最后不要忘了在user/uthread.cthread_schedule函数中调用上面的thread_switch函数:

1
2
3
4
5
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)&t->context, (uint64)&current_thread->context);

Using threads

这个实验和 xv6 以及 qemu 无关,是使用自己设备上的编译器进行编译,也就是跑在自己设备上的程序。题目提供了一份多线程并行存取哈希表的程序,但是没有做同步处理,所以当运行的线程大于 1 时会出现 race condition并出错,题目要求的是使用 pthread_mutex加锁来消除竞争。如果熟悉 POSIX pthread库的话十分简单。

首先定义 NBUCKET 把锁(每个桶一把锁,当不同线程存不同桶的时候不会发生竞争,达到并行的效果),并在 mian中初始化锁:

1
2
3
4
5
6
7
8
pthread_mutex_t locks[NBUCKET];  // one lock per bucket

int main() {
...
for(int i=0; i<NBUCKET; ++i)
pthread_mutex_init(&locks[i], NULL);
...
}

然后不要忘记在临界区加锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static 
void put(int key, int value)
{
int i = key % NBUCKET;

pthread_mutex_lock(&locks[i]); /* held the mutex */
// is the key already present?
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key)
break;
}
if(e){
// update the existing key.
e->value = value;
} else {
// the new is new.
insert(key, value, &table[i], table[i]);
}
pthread_mutex_unlock(&locks[i]); /* release mutex */

}

Barrier

这个题和上一个一样,在自己的电脑上跑,内容是条件变量的使用,实际上,(或者说互斥量)和条件变量是现代多线程程序常用的两种同步方式。同样,稍微熟悉一点相关用法的话很简单,当然多线程程序调试起来还是有点痛苦,我先把代码贴出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void 
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//
pthread_mutex_lock(&bstate.barrier_mutex);
++bstate.nthread;
if (bstate.nthread % nthread == 0) {
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
pthread_mutex_unlock(&bstate.barrier_mutex);
return;
} else {
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
}
pthread_mutex_unlock(&bstate.barrier_mutex);
}

当线程进入 barrier 时,我们先加锁,然后将线程计数加 1,若当前线程不是最后一个到达的则让其等待(wait),最后一个到达的将 round 加 1,然后叫醒其它所有的线程。

测试结果

2022年6月29日测试通过:

参考文献

  1. https://pdos.csail.mit.edu/6.828/2021/schedule.html