[关闭]
@SovietPower 2022-05-18T14:58:43.000000Z 字数 12766 阅读 1149

MIT 6.824 Distributed Systems Lab1

Study



schedule:https://pdos.csail.mit.edu/6.824/schedule.html
中文视频:https://www.bilibili.com/video/BV1x7411M7Sf
视频翻译:https://www.zhihu.com/column/c_1273718607160393728
(MapReduce论文翻译:https://zhuanlan.zhihu.com/p/122571315
一个教程:https://zhuanlan.zhihu.com/p/260470258
Lab1参考:https://zhuanlan.zhihu.com/p/425093684

MapReduce
一个Map或Reduce任务,称作task;所有task,称作job。
一个worker通常执行多个task(运行多个进程)。
Reduce Worker通过RPC处理Map Worker输出的数据,然后发送最终结果。总是在本地运行,所以通信代价不高。

Map
处理每个键值对,根据key划分为r部分,分别存到r个mr-X-Y文件中。

Reduce
Worker进行Reduce前,要进行shuffle(按key划分),然后对于每一个key和相应的value集合,执行Reduce。

shuffle
将每个Map Worker生成的数据(包含多个不同的key)按照key重新分组(以便能将数据转移到Reduce Worker)。可以在Map中将键值对按key排序,提高此处的效率。


实验准备

需要在Linux下进行。windows上可以安装WSL使用Ubuntu环境(默认为WSL2)。
https://docs.microsoft.com/zh-cn/windows/wsl/install
(随便可以安装Windows Terminal:https://github.com/microsoft/terminal/releases

WSL配置,并使用VSCode:
https://docs.microsoft.com/zh-cn/windows/wsl/setup/environment#set-up-your-linux-username-and-password

然后在wsl上安装go:sudo apt install golang-go(项目不能用windows上的go编译,因为-buildmode=plugin not supported on windows/amd64)。

获取代码:

  1. git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824lab

-race启用并发访问冲突检测。race detector通过在程序运行时探测所有的内存访问,可监控对共享变量的非同步访问,在冲突时警告。
只有实际运行时,该工具才可能检测到问题。
一般只在开发测试时启用,在正式环境中会影响性能。


Lab1

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 中有要求及提示。

代码中提供了非分布式实现的mrsequential.go,使用

  1. go build -buildmode=plugin ../mrapps/wc.go
  2. go run mrsequential.go wc.so pg-*.txt

即可运行,并得到标准结果mr-out-0
该文件的代码都可以作为参考。

其中第一句是将map(), reduce()通过go的插件包(.so文件)动态装载。
如果修改了mr/中的内容,运行前都需要重新编译wc.sogo build -buildmode=plugin ../mrapps/wc.go

main/中的mrmaster.go, mrworker.go启动进程,加载map(), reduce()动态库,并调用mr/下的master, worker
我们要修改的就是mr/下的master, worker

测试方式

以下均main目录下。

自测
重新编译wc.gogo build -race -buildmode=plugin ../mrapps/wc.go
运行master:

  1. $ rm mr-out*
  2. $ go run -race mrmaster.go pg-*.txt

pg-*.txt是要传给Map Worker的、分割后的输入文件)
在不同终端运行多个worker:go run -race mrworker.go wc.so

最终测试
直接运行./test-mr.sh。它会重新编译文件并运行测试。
在文件中使用RACE=-race打开冲突检测。


master需运行若干Map Worker(每个执行一个或多个Map Task),每个Worker生成给定的nReducemr-X-Y输出文件(X为Map Task编号,Y为Reduce Task编号)。
然后,master需运行nReduce个Reduce Worker(每个执行一个Reduce Task),并生成nReducemr-out-*输出文件。
测试脚本test-mr.sh会将这些输出文件mr-out-*按key排序、合并成一个文件mr-wc-all,并与mr-correct-wc.txt进行比较。
通过cat mr-out-* | sort | more可查看排序后的输出。

类似rpc.Register: method "Done" has 1 input parameters; needs exactly three的错误可以忽略。

Ignore these messages; registering the coordinator as an RPC server checks if all its methods are suitable for RPCs (have 3 inputs); we know that Done is not called via RPC.

类似dialing:dial unix /var/tmp/824-mr-0: connect: connection refused的错误应该也可以忽略。

实现Worker与Master的通信

代码中已经基本实现了 利用RPC实现Worker与Master的通信。
在Worker中声明CallExample()
进行测试,即可看到worker通过RPC调用了master的Example(),传入了ExampleArgs,并接受了master的输出ExampleReply100

Master分发Map任务

先简单实现Map任务,然后补充细节,最后比着实现Reduce任务。

Task
修改ExampleReply为要传递的Task(若一个Worker执行多个Task,可多次请求Task)。
Task为需要下面元素的struct:

  1. // task
  2. type Task struct {
  3. TaskType TaskType
  4. TaskID int // 用于生成对应ID的文件
  5. NReduce int // 用于取模
  6. InputFiles []string
  7. }
  8. // type of task
  9. type TaskType int
  10. const (
  11. MapTask = iota
  12. ReduceTask
  13. Wait // no available task for now
  14. Kill // all tasks done
  15. )

Master
通过go的channel可方便地存储可用的task,在需要时取出task传给Worker。
所以Master为如下struct:

  1. type Master struct {
  2. MapTaskChannel chan *Task
  3. ReduceTaskChannel chan *Task
  4. NMap int // number of Map tasks
  5. NReduce int // number of Reduce tasks
  6. NTask int // number of current tasks
  7. Status MasterStatus
  8. }
  9. // status of master
  10. type MasterStatus int
  11. const (
  12. MapPhase = iota
  13. ReducePhase
  14. AllDone
  15. )

然后实现创建Master的函数func MakeMaster(files []string, nReduce int) *Master {}

生成Map Task
Master先生成需要的Map Task,并存到Master的chan中:

  1. // The start of Map phase. Generate Map Tasks and store them in MapTaskChannel
  2. func (m *Master) MakeMapTask(files []string) {
  3. for _, v := range files {
  4. id := m.GetTaskID()
  5. task := Task{
  6. TaskType: MapTask,
  7. TaskID: id,
  8. InputFiles: []string{v},
  9. }
  10. m.MapTaskChannel <- &task
  11. }
  12. println("finish MakeMapTask")
  13. }
  14. // Get an ID for a new task
  15. func (m *Master) GetTaskID() int {
  16. m.NTask++
  17. return m.NTask - 1
  18. }

分发Task
类似master.go中的Example()

  1. // When Worker is started or finishes its task, it calls DistributeTask() to get a new task.
  2. // When there is no task available, Master sends a Wait task to tell Worker to wait for a few seconds before calling again.
  3. // When everything is finished, Master sends a Kill task and the Worker then finish.
  4. func (m *Master) DistributeTask(args *Request, reply *Task) error {
  5. fmt.Println("DistributeTask")
  6. switch m.Status {
  7. case MapPhase:
  8. if len(m.MapTaskChannel) > 0 {
  9. *reply = *<-m.MapTaskChannel
  10. // reply = <-m.MapTaskChannel // 错。reply为传值引用,修改reply不会影响worker处的reply
  11. } else {
  12. reply.TaskType = Wait
  13. }
  14. case ReducePhase:
  15. if len(m.ReduceTaskChannel) > 0 {
  16. *reply = *<-m.ReduceTaskChannel
  17. } else {
  18. reply.TaskType = Wait
  19. }
  20. case AllDone:
  21. reply.TaskType = Kill
  22. }
  23. return nil
  24. }

然后在worker.go中的Worker()调用CallDistributeTask()(同CallExample()),给出参数并获取task。

检查
测试一遍,3个worker可分别输出获得的task:

  1. Get Task: &{0 0 10 [pg-being_ernest.txt]}
  2. Get Task: &{0 1 10 [pg-dorian_gray.txt]}
  3. Get Task: &{0 2 10 [pg-frankenstein.txt]}

Map Worker执行Task

Worker()中调用DoMapTask(),然后实现DoMapTask()
首先与mrsequential.go相同,读入task中的文件,并将mapf处理后的结果存入中间值intermediate
然后将intermediate中的键值对,根据key划分为r部分(用ihash(Key)%r),分别存到r个mr-X-Y文件中。
存储用 json.Encoder 编码。为方便此处处理,先将属于同一部分的键值对分别存储,再对每部分键值对一起编码、存储到指定文件。
注意,论文中Worker执行完Map后需对数据排序,在数据量大的情况下会使Reduce处的排序更快。但此处不需要。

  1. func DoMapTask(mapf func(string, string) []KeyValue, task *Task) {
  2. // read each input file, pass it to Mapf and accumulate the intermediate Mapf output.
  3. filename := task.InputFiles[0]
  4. file, err := os.Open(filename)
  5. if err != nil {
  6. log.Fatalf("cannot open %v", filename)
  7. }
  8. content, err := ioutil.ReadAll(file)
  9. if err != nil {
  10. log.Fatalf("cannot read %v", filename)
  11. }
  12. file.Close()
  13. intermediate := mapf(filename, string(content))
  14. // divide the output into r parts and store them in mr-X-Y individually
  15. r := task.NReduce
  16. prefix := "mr-" + strconv.Itoa(task.TaskID) + "-"
  17. // divide them in advance to encode them by json.Encoder conveniently
  18. // the KVs with the same key need to be encoded and stored together
  19. dividedKV := make([][]KeyValue, r)
  20. for _, kv := range intermediate {
  21. hs := ihash(kv.Key) % r
  22. dividedKV[hs] = append(dividedKV[hs], kv)
  23. }
  24. for i := 0; i < r; i++ {
  25. oname := prefix + strconv.Itoa(i)
  26. ofile, _ := os.Create(oname)
  27. enc := json.NewEncoder(ofile)
  28. for _, kv := range dividedKV[i] {
  29. enc.Encode(kv)
  30. }
  31. ofile.Close()
  32. }
  33. fmt.Println("finish DoMapTask")
  34. }

测试
运行1个Worker,会发现有10个输出文件mr-0-0, mr-0-1, ..., mr-0-9,且这些文件包含的 key 都不相同。

Master管理任务

设计task信息结构
Master需要记录每个task的起始时间,以便在task超时时重新分发该task。
Master还需记录每个task的状态(未分配,已分配,已完成)。
task是传给Worker的结构,没必要在task中加这些信息,所以定义TaskInfo结构表示这些信息(也需记录指向的Task):

  1. // info of task
  2. type TaskInfo struct {
  3. TaskStatus TaskStatus
  4. ExpiredAt time.Time
  5. Task *Task
  6. }
  7. // status of task
  8. type TaskStatus int
  9. const (
  10. TaskWaiting = iota
  11. TaskRunning
  12. TaskFinished
  13. )

Master需维护TaskInfoMap map[int]*TaskInfo来存储所有task对应的TaskInfo(以 TaskID 为key)。
注意,要在MakeMapTaskMakeReduceTask中初始化map。

  1. type Master struct {
  2. MapTaskChannel chan *Task
  3. ReduceTaskChannel chan *Task
  4. NMap int // number of Map tasks
  5. NReduce int // number of Reduce tasks
  6. NTask int // number of current tasks
  7. Status MasterStatus
  8. TaskInfoMap map[int]*TaskInfo
  9. }

记录并更新task信息
MakeMapTask()中生成task后,更新master的TaskInfoMap

  1. {
  2. ...
  3. taskInfo := TaskInfo{
  4. TaskStatus: TaskWaiting,
  5. Task: &task,
  6. }
  7. m.NewTaskInfo(&taskInfo)
  8. }
  9. // Store a taskInfo in Master.TaskInfoMap.
  10. func (m *Master) NewTaskInfo(taskInfo *TaskInfo) {
  11. id := taskInfo.Task.TaskID
  12. value, _ := m.TaskInfoMap[id]
  13. if value != nil {
  14. fmt.Println("TaskInfo conflicted:", id, value, taskInfo)
  15. } else {
  16. m.TaskInfoMap[id] = taskInfo
  17. }
  18. }

DistributeTask()分发task后,更新TaskInfo的过期时间。

  1. {
  2. ...
  3. switch m.Status {
  4. case MapPhase:
  5. if len(m.MapTaskChannel) > 0 {
  6. *reply = *<-m.MapTaskChannel
  7. // reply = <-m.MapTaskChannel // 错。reply为传值引用,修改reply不会影响worker处的reply
  8. if !m.UpdateTaskInfo(reply.TaskID) {
  9. fmt.Println("No such TaskInfo or Task", reply.TaskID, "runs again.")
  10. }
  11. } else {
  12. reply.TaskType = Wait
  13. }
  14. case ReducePhase:
  15. if len(m.ReduceTaskChannel) > 0 {
  16. *reply = *<-m.ReduceTaskChannel
  17. if !m.UpdateTaskInfo(reply.TaskID) {
  18. fmt.Println("No such TaskInfo or Task", reply.TaskID, "runs again.")
  19. }
  20. } else {
  21. reply.TaskType = Wait
  22. }
  23. case Done:
  24. reply.TaskType = Kill
  25. }
  26. }
  27. // Update TaskStatus and ExpiredAt.
  28. func (m *Master) UpdateTaskInfo(taskID int) bool {
  29. taskInfo, ok := m.GetTaskInfo(taskID)
  30. if !ok || taskInfo.TaskStatus != TaskWaiting {
  31. return false
  32. }
  33. taskInfo.TaskStatus = TaskRunning
  34. taskInfo.ExpiredAt = time.Now().Add(ExpireTime)
  35. return true
  36. }

task超时检测及处理
用协程周期运行一个周期性超时检测函数,遇到超时任务则将其放入队列等待重新分配。应该不需要通知正在执行的worker?因为使用生成临时文件(随机名字),再改文件名的方式,不会冲突。

  1. // main/mrmaster.go calls TimeoutHandler() periodically to detect time-outs and redistribute these tasks.
  2. func (m *Master) TimeoutHandler() {
  3. for {
  4. time.Sleep(WaitPeriod)
  5. if m.Status == AllDone {
  6. return
  7. }
  8. now := time.Now()
  9. for _, v := range m.TaskInfoMap {
  10. if v.TaskStatus == TaskRunning && v.ExpiredAt.Before(now) {
  11. v.TaskStatus = TaskWaiting
  12. switch v.Task.TaskType {
  13. case MapTask:
  14. m.MapTaskChannel <- v.Task
  15. break
  16. case ReduceTask:
  17. m.ReduceTaskChannel <- v.Task
  18. break
  19. }
  20. }
  21. }
  22. }
  23. }

完善Reduce Worker

生成Reduce Task
该函数在所有Map Task完成后执行。

  1. // The start of Reduce phase. Generate Reduce Tasks and store them in ReduceTaskChannel.
  2. func (m *Master) MakeReduceTask() {
  3. for i := 0; i < m.NReduce; i++ {
  4. id := m.GetTaskID()
  5. task := Task{
  6. TaskType: ReduceTask,
  7. TaskID: id,
  8. InputFiles: m.GetReduceInputFiles(i),
  9. }
  10. taskInfo := TaskInfo{
  11. TaskStatus: TaskWaiting,
  12. Task: &task,
  13. }
  14. m.NewTaskInfo(&taskInfo)
  15. m.ReduceTaskChannel <- &task
  16. fmt.Println("Generate Reduce Task:", task)
  17. }
  18. fmt.Println("finish MakeReduceTask")
  19. }
  20. // Generate the file names that the reduce worker needs (mr-*-y).
  21. func (m *Master) GetReduceInputFiles(rid int) []string {
  22. var s []string
  23. suffix := "-" + strconv.Itoa(rid)
  24. for i := 0; i < m.NMap; i++ {
  25. s = append(s, "mr-"+strconv.Itoa(i)+suffix) // mr-*-rid
  26. // todo: need to validate the validity of the file
  27. }
  28. return s
  29. }

实现DoReduceWork()
DoReduceWork()需要从Map生成的若干个文件中读入,然后对这些文件中的每一个key,将所有的value保存为一个集合。对于每一个key和相应的value集合,执行reducef
按照mrsequential实现即可,只是需要先从多个文件中读入获取intermediate,并使用临时文件输出。

  1. func DoReduceTask(reducef func(string, []string) []KeyValue, task *Task) {
  2. // Read each input file and get the intermediate (all key/value pairs). Need to decode them from json.
  3. var tempKV KeyValue
  4. var intermediate []KeyValue
  5. for _, filename := range task.InputFiles {
  6. file, err := os.Open(filename)
  7. if err != nil {
  8. log.Fatalf("cannot open %v", filename)
  9. }
  10. dec := json.NewDecoder(file)
  11. for {
  12. if dec.Decode(&tempKV) != nil {
  13. break
  14. }
  15. intermediate = append(intermediate, tempKV)
  16. }
  17. file.Close()
  18. }
  19. sort.Sort(ByKey(intermediate))
  20. ...
  21. }

修改Work()使Worker按照任务类型执行不同函数

  1. // main/mrworker.go calls this function.
  2. func Worker(mapf func(string, string) []KeyValue,
  3. reducef func(string, []string) string) {
  4. done := false
  5. for !done {
  6. // use RPC to the master and get the task
  7. task := CallDistributeTask()
  8. fmt.Println("Get Task:", task)
  9. // worker implementation here
  10. switch task.TaskType {
  11. case MapTask:
  12. DoMapTask(mapf, task)
  13. CallTaskDone(task)
  14. case ReduceTask:
  15. DoReduceTask(reducef, task)
  16. CallTaskDone(task)
  17. case Wait:
  18. time.Sleep(TaskWaitPeriod)
  19. case Kill:
  20. done = true
  21. fmt.Println("A worker finished.")
  22. default:
  23. fmt.Println("Worker(): Invalid TaskType:", task.TaskType)
  24. }
  25. // Request another task.
  26. }
  27. }

Worker任务完成时通知Master

worker在任务完成时调用CallTaskDone(task),从而调用master的TaskDone(),表示一个任务完成。
Master中加一个NUnfinishedTask int // number of unfinished tasks,然后在TaskDone中减少这个值,用来检测当前阶段所有task是否完成。

  1. // A task is finished.
  2. func (m *Master) TaskDone(task *Task, reply *string) error {
  3. taskStatus_mutex.Lock()
  4. defer taskStatus_mutex.Unlock()
  5. info, ok := m.GetTaskInfo(task.TaskID)
  6. if !ok {
  7. fmt.Println("Invalid TaskID:", task.TaskID)
  8. } else if info.TaskStatus != TaskFinished {
  9. // Be aware that the TaskStatus of an undone task may be TaskWaiting or TaskRunning.
  10. m.NUnfinishedTask--
  11. info.TaskStatus = TaskFinished
  12. fmt.Println("Task", task.TaskID, "finished.")
  13. if m.CurrentPhaseDone() {
  14. m.NextPhase()
  15. }
  16. } else {
  17. fmt.Println("Task", task.TaskID, "got a timeout and finished again.")
  18. }
  19. return nil
  20. }

Master状态检查

每当一个task完成时(TaskDone()被调用),检查Master是否已完成当前阶段,然后进行下一阶段(MapPhase->ReducePhase,或ReducePhase->Done)。

  1. // Whether all the tasks are finished or not.
  2. func (m *Master) CurrentPhaseDone() bool {
  3. return m.NUnfinishedTask == 0
  4. }
  5. // Change status from MapPhase to ReducePhase, or from ReducePhase to All Done.
  6. func (m *Master) NextPhase() {
  7. switch m.Status {
  8. case MapPhase:
  9. m.Status = ReducePhase
  10. m.MakeReduceTask()
  11. case ReducePhase:
  12. m.Status = AllDone
  13. }
  14. }

到这所有基本功能应该已经完成了。

加锁

-race会在运行时检测可能的冲突。

简单自测可发现,master在对m.Status修改时出现了冲突访问。
所以对所有修改/访问m.Status的函数需要加锁。
但是只有在NextPhase()中会对该值有写操作,且该函数一共也只执行两次,其它都是对m.Status的读。所以用read_count+两个互斥锁限制写,允许同步读。

  1. var mutex sync.Mutex // a mutex for m.Status
  2. var rc_mutex sync.Mutex // a mutex for read_count
  3. var read_count int // a read_count for m.Status
  4. // Lock
  5. func ReaderLock() {
  6. rc_mutex.Lock()
  7. if read_count++; read_count == 1 {
  8. mutex.Lock()
  9. }
  10. rc_mutex.Unlock()
  11. }
  12. // Unlock
  13. func ReaderUnlock() {
  14. rc_mutex.Lock()
  15. if read_count--; read_count == 0 {
  16. mutex.Unlock()
  17. }
  18. rc_mutex.Unlock()
  19. }

taskInfo.TaskStatus也会出现冲突。
主要在UpdateTaskInfo()TaskDone()TimeoutHandler()中冲突。因为和m.Status的情况不太一样,所以直接再加一个锁。

  1. var taskStatus_mutex sync.Mutex // a mutex for TaskStatus

小问题

上面的实现中有个小问题:DoMapTask()也需使用输出临时文件,并在完成时重命名的方式。
任务完成、检查是否进入下一阶段的判定是:一个任务的状态变为Finished。但在这时这个任务可能已因超时被重复执行,但在重复执行完成前,该任务可能导致整个阶段已完成。此时重复执行的Map Worker仍在更新文件,会导致Reduce阶段使用的输入文件有问题。
所以在Map阶段的DoMapTask()也使用临时文件输出、然后更名即可。

测试结果

  1. gxb@GXB:/mnt/e/GitHub/6.824lab/src/main$ ./test-mr.sh
  2. *** Starting wc test.
  3. --- wc test: PASS
  4. *** Starting indexer test.
  5. --- indexer test: PASS
  6. *** Starting map parallelism test.
  7. --- map parallelism test: PASS
  8. *** Starting reduce parallelism test.
  9. --- reduce parallelism test: PASS
  10. *** Starting crash test.
  11. --- crash test: PASS
  12. *** PASSED ALL TESTS

完成。
Lab1总体不是很难,想清楚就好,要写的比较多。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注