@SovietPower
2022-05-18T14:58:43.000000Z
字数 12766
阅读 2087
Study
schedule:https://pdos.csail.mit.edu/6.824/schedule.html
中文视频:https://www.bilibili.com/video/BV1x7411M7Sf
视频翻译:https://www.zhihu.com/column/c_1273718607160393728
(MapReduce论文翻译:https://zhuanlan.zhihu.com/p/122571315)
一个教程:https://zhuanlan.zhihu.com/p/260470258
Lab1参考:https://zhuanlan.zhihu.com/p/425093684
MapReduce
一个Map或Reduce任务,称作task;所有task,称作job。
一个worker通常执行多个task(运行多个进程)。
Reduce Worker通过RPC处理Map Worker输出的数据,然后发送最终结果。总是在本地运行,所以通信代价不高。
Map
处理每个键值对,根据key划分为r部分,分别存到r个mr-X-Y文件中。
Reduce
Worker进行Reduce前,要进行shuffle(按key划分),然后对于每一个key和相应的value集合,执行Reduce。
shuffle
将每个Map Worker生成的数据(包含多个不同的key)按照key重新分组(以便能将数据转移到Reduce Worker)。可以在Map中将键值对按key排序,提高此处的效率。
需要在Linux下进行。windows上可以安装WSL使用Ubuntu环境(默认为WSL2)。
https://docs.microsoft.com/zh-cn/windows/wsl/install
(随便可以安装Windows Terminal:https://github.com/microsoft/terminal/releases)
WSL配置,并使用VSCode:
https://docs.microsoft.com/zh-cn/windows/wsl/setup/environment#set-up-your-linux-username-and-password
然后在wsl上安装go:sudo apt install golang-go(项目不能用windows上的go编译,因为-buildmode=plugin not supported on windows/amd64)。
获取代码:
git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824lab
-race启用并发访问冲突检测。race detector通过在程序运行时探测所有的内存访问,可监控对共享变量的非同步访问,在冲突时警告。
只有实际运行时,该工具才可能检测到问题。
一般只在开发测试时启用,在正式环境中会影响性能。
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 中有要求及提示。
代码中提供了非分布式实现的mrsequential.go,使用
go build -buildmode=plugin ../mrapps/wc.gogo run mrsequential.go wc.so pg-*.txt
即可运行,并得到标准结果mr-out-0。
该文件的代码都可以作为参考。
其中第一句是将map(), reduce()通过go的插件包(.so文件)动态装载。
如果修改了mr/中的内容,运行前都需要重新编译wc.so:go build -buildmode=plugin ../mrapps/wc.go。
main/中的mrmaster.go, mrworker.go启动进程,加载map(), reduce()动态库,并调用mr/下的master, worker。
我们要修改的就是mr/下的master, worker。
以下均main目录下。
自测
重新编译wc.go:go build -race -buildmode=plugin ../mrapps/wc.go
运行master:
$ rm mr-out*$ go run -race mrmaster.go pg-*.txt
(pg-*.txt是要传给Map Worker的、分割后的输入文件)
在不同终端运行多个worker:go run -race mrworker.go wc.so,
最终测试
直接运行./test-mr.sh。它会重新编译文件并运行测试。
在文件中使用RACE=-race打开冲突检测。
注
master需运行若干Map Worker(每个执行一个或多个Map Task),每个Worker生成给定的nReduce个mr-X-Y输出文件(X为Map Task编号,Y为Reduce Task编号)。
然后,master需运行nReduce个Reduce Worker(每个执行一个Reduce Task),并生成nReduce个mr-out-*输出文件。
测试脚本test-mr.sh会将这些输出文件mr-out-*按key排序、合并成一个文件mr-wc-all,并与mr-correct-wc.txt进行比较。
通过cat mr-out-* | sort | more可查看排序后的输出。
类似rpc.Register: method "Done" has 1 input parameters; needs exactly three的错误可以忽略。
Ignore these messages; registering the coordinator as an RPC server checks if all its methods are suitable for RPCs (have 3 inputs); we know that Done is not called via RPC.
类似dialing:dial unix /var/tmp/824-mr-0: connect: connection refused的错误应该也可以忽略。
代码中已经基本实现了 利用RPC实现Worker与Master的通信。
在Worker中声明CallExample()。
进行测试,即可看到worker通过RPC调用了master的Example(),传入了ExampleArgs,并接受了master的输出ExampleReply即100。
先简单实现Map任务,然后补充细节,最后比着实现Reduce任务。
Task
修改ExampleReply为要传递的Task(若一个Worker执行多个Task,可多次请求Task)。
Task为需要下面元素的struct:
int。[]string。Map Worker需要若干pg-*.txt;Reduce Worker需要Map生成的w*r个中间文件,用mr-X-Y命名(X为Map Task编号,Y为Reduce Task编号)。
// tasktype Task struct {TaskType TaskTypeTaskID int // 用于生成对应ID的文件NReduce int // 用于取模InputFiles []string}// type of tasktype TaskType intconst (MapTask = iotaReduceTaskWait // no available task for nowKill // all tasks done)
Master
通过go的channel可方便地存储可用的task,在需要时取出task传给Worker。
所以Master为如下struct:
type Master struct {MapTaskChannel chan *TaskReduceTaskChannel chan *TaskNMap int // number of Map tasksNReduce int // number of Reduce tasksNTask int // number of current tasksStatus MasterStatus}// status of mastertype MasterStatus intconst (MapPhase = iotaReducePhaseAllDone)
然后实现创建Master的函数func MakeMaster(files []string, nReduce int) *Master {}。
生成Map Task
Master先生成需要的Map Task,并存到Master的chan中:
// The start of Map phase. Generate Map Tasks and store them in MapTaskChannelfunc (m *Master) MakeMapTask(files []string) {for _, v := range files {id := m.GetTaskID()task := Task{TaskType: MapTask,TaskID: id,InputFiles: []string{v},}m.MapTaskChannel <- &task}println("finish MakeMapTask")}// Get an ID for a new taskfunc (m *Master) GetTaskID() int {m.NTask++return m.NTask - 1}
分发Task
类似master.go中的Example():
// When Worker is started or finishes its task, it calls DistributeTask() to get a new task.// When there is no task available, Master sends a Wait task to tell Worker to wait for a few seconds before calling again.// When everything is finished, Master sends a Kill task and the Worker then finish.func (m *Master) DistributeTask(args *Request, reply *Task) error {fmt.Println("DistributeTask")switch m.Status {case MapPhase:if len(m.MapTaskChannel) > 0 {*reply = *<-m.MapTaskChannel// reply = <-m.MapTaskChannel // 错。reply为传值引用,修改reply不会影响worker处的reply} else {reply.TaskType = Wait}case ReducePhase:if len(m.ReduceTaskChannel) > 0 {*reply = *<-m.ReduceTaskChannel} else {reply.TaskType = Wait}case AllDone:reply.TaskType = Kill}return nil}
然后在worker.go中的Worker()调用CallDistributeTask()(同CallExample()),给出参数并获取task。
检查
测试一遍,3个worker可分别输出获得的task:
Get Task: &{0 0 10 [pg-being_ernest.txt]}Get Task: &{0 1 10 [pg-dorian_gray.txt]}Get Task: &{0 2 10 [pg-frankenstein.txt]}
在Worker()中调用DoMapTask(),然后实现DoMapTask():
首先与mrsequential.go相同,读入task中的文件,并将mapf处理后的结果存入中间值intermediate。
然后将intermediate中的键值对,根据key划分为r部分(用ihash(Key)%r),分别存到r个mr-X-Y文件中。
存储用 json.Encoder 编码。为方便此处处理,先将属于同一部分的键值对分别存储,再对每部分键值对一起编码、存储到指定文件。
注意,论文中Worker执行完Map后需对数据排序,在数据量大的情况下会使Reduce处的排序更快。但此处不需要。
func DoMapTask(mapf func(string, string) []KeyValue, task *Task) {// read each input file, pass it to Mapf and accumulate the intermediate Mapf output.filename := task.InputFiles[0]file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)}file.Close()intermediate := mapf(filename, string(content))// divide the output into r parts and store them in mr-X-Y individuallyr := task.NReduceprefix := "mr-" + strconv.Itoa(task.TaskID) + "-"// divide them in advance to encode them by json.Encoder conveniently// the KVs with the same key need to be encoded and stored togetherdividedKV := make([][]KeyValue, r)for _, kv := range intermediate {hs := ihash(kv.Key) % rdividedKV[hs] = append(dividedKV[hs], kv)}for i := 0; i < r; i++ {oname := prefix + strconv.Itoa(i)ofile, _ := os.Create(oname)enc := json.NewEncoder(ofile)for _, kv := range dividedKV[i] {enc.Encode(kv)}ofile.Close()}fmt.Println("finish DoMapTask")}
测试
运行1个Worker,会发现有10个输出文件mr-0-0, mr-0-1, ..., mr-0-9,且这些文件包含的 key 都不相同。
设计task信息结构
Master需要记录每个task的起始时间,以便在task超时时重新分发该task。
Master还需记录每个task的状态(未分配,已分配,已完成)。
task是传给Worker的结构,没必要在task中加这些信息,所以定义TaskInfo结构表示这些信息(也需记录指向的Task):
// info of tasktype TaskInfo struct {TaskStatus TaskStatusExpiredAt time.TimeTask *Task}// status of tasktype TaskStatus intconst (TaskWaiting = iotaTaskRunningTaskFinished)
Master需维护TaskInfoMap map[int]*TaskInfo来存储所有task对应的TaskInfo(以 TaskID 为key)。
注意,要在MakeMapTask和MakeReduceTask中初始化map。
type Master struct {MapTaskChannel chan *TaskReduceTaskChannel chan *TaskNMap int // number of Map tasksNReduce int // number of Reduce tasksNTask int // number of current tasksStatus MasterStatusTaskInfoMap map[int]*TaskInfo}
记录并更新task信息
在MakeMapTask()中生成task后,更新master的TaskInfoMap。
{...taskInfo := TaskInfo{TaskStatus: TaskWaiting,Task: &task,}m.NewTaskInfo(&taskInfo)}// Store a taskInfo in Master.TaskInfoMap.func (m *Master) NewTaskInfo(taskInfo *TaskInfo) {id := taskInfo.Task.TaskIDvalue, _ := m.TaskInfoMap[id]if value != nil {fmt.Println("TaskInfo conflicted:", id, value, taskInfo)} else {m.TaskInfoMap[id] = taskInfo}}
在DistributeTask()分发task后,更新TaskInfo的过期时间。
{...switch m.Status {case MapPhase:if len(m.MapTaskChannel) > 0 {*reply = *<-m.MapTaskChannel// reply = <-m.MapTaskChannel // 错。reply为传值引用,修改reply不会影响worker处的replyif !m.UpdateTaskInfo(reply.TaskID) {fmt.Println("No such TaskInfo or Task", reply.TaskID, "runs again.")}} else {reply.TaskType = Wait}case ReducePhase:if len(m.ReduceTaskChannel) > 0 {*reply = *<-m.ReduceTaskChannelif !m.UpdateTaskInfo(reply.TaskID) {fmt.Println("No such TaskInfo or Task", reply.TaskID, "runs again.")}} else {reply.TaskType = Wait}case Done:reply.TaskType = Kill}}// Update TaskStatus and ExpiredAt.func (m *Master) UpdateTaskInfo(taskID int) bool {taskInfo, ok := m.GetTaskInfo(taskID)if !ok || taskInfo.TaskStatus != TaskWaiting {return false}taskInfo.TaskStatus = TaskRunningtaskInfo.ExpiredAt = time.Now().Add(ExpireTime)return true}
task超时检测及处理
用协程周期运行一个周期性超时检测函数,遇到超时任务则将其放入队列等待重新分配。应该不需要通知正在执行的worker?因为使用生成临时文件(随机名字),再改文件名的方式,不会冲突。
// main/mrmaster.go calls TimeoutHandler() periodically to detect time-outs and redistribute these tasks.func (m *Master) TimeoutHandler() {for {time.Sleep(WaitPeriod)if m.Status == AllDone {return}now := time.Now()for _, v := range m.TaskInfoMap {if v.TaskStatus == TaskRunning && v.ExpiredAt.Before(now) {v.TaskStatus = TaskWaitingswitch v.Task.TaskType {case MapTask:m.MapTaskChannel <- v.Taskbreakcase ReduceTask:m.ReduceTaskChannel <- v.Taskbreak}}}}}
生成Reduce Task
该函数在所有Map Task完成后执行。
// The start of Reduce phase. Generate Reduce Tasks and store them in ReduceTaskChannel.func (m *Master) MakeReduceTask() {for i := 0; i < m.NReduce; i++ {id := m.GetTaskID()task := Task{TaskType: ReduceTask,TaskID: id,InputFiles: m.GetReduceInputFiles(i),}taskInfo := TaskInfo{TaskStatus: TaskWaiting,Task: &task,}m.NewTaskInfo(&taskInfo)m.ReduceTaskChannel <- &taskfmt.Println("Generate Reduce Task:", task)}fmt.Println("finish MakeReduceTask")}// Generate the file names that the reduce worker needs (mr-*-y).func (m *Master) GetReduceInputFiles(rid int) []string {var s []stringsuffix := "-" + strconv.Itoa(rid)for i := 0; i < m.NMap; i++ {s = append(s, "mr-"+strconv.Itoa(i)+suffix) // mr-*-rid// todo: need to validate the validity of the file}return s}
实现DoReduceWork()
DoReduceWork()需要从Map生成的若干个文件中读入,然后对这些文件中的每一个key,将所有的value保存为一个集合。对于每一个key和相应的value集合,执行reducef。
按照mrsequential实现即可,只是需要先从多个文件中读入获取intermediate,并使用临时文件输出。
func DoReduceTask(reducef func(string, []string) []KeyValue, task *Task) {// Read each input file and get the intermediate (all key/value pairs). Need to decode them from json.var tempKV KeyValuevar intermediate []KeyValuefor _, filename := range task.InputFiles {file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}dec := json.NewDecoder(file)for {if dec.Decode(&tempKV) != nil {break}intermediate = append(intermediate, tempKV)}file.Close()}sort.Sort(ByKey(intermediate))...}
修改Work()使Worker按照任务类型执行不同函数
// main/mrworker.go calls this function.func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {done := falsefor !done {// use RPC to the master and get the tasktask := CallDistributeTask()fmt.Println("Get Task:", task)// worker implementation hereswitch task.TaskType {case MapTask:DoMapTask(mapf, task)CallTaskDone(task)case ReduceTask:DoReduceTask(reducef, task)CallTaskDone(task)case Wait:time.Sleep(TaskWaitPeriod)case Kill:done = truefmt.Println("A worker finished.")default:fmt.Println("Worker(): Invalid TaskType:", task.TaskType)}// Request another task.}}
worker在任务完成时调用CallTaskDone(task),从而调用master的TaskDone(),表示一个任务完成。
在Master中加一个NUnfinishedTask int // number of unfinished tasks,然后在TaskDone中减少这个值,用来检测当前阶段所有task是否完成。
// A task is finished.func (m *Master) TaskDone(task *Task, reply *string) error {taskStatus_mutex.Lock()defer taskStatus_mutex.Unlock()info, ok := m.GetTaskInfo(task.TaskID)if !ok {fmt.Println("Invalid TaskID:", task.TaskID)} else if info.TaskStatus != TaskFinished {// Be aware that the TaskStatus of an undone task may be TaskWaiting or TaskRunning.m.NUnfinishedTask--info.TaskStatus = TaskFinishedfmt.Println("Task", task.TaskID, "finished.")if m.CurrentPhaseDone() {m.NextPhase()}} else {fmt.Println("Task", task.TaskID, "got a timeout and finished again.")}return nil}
每当一个task完成时(TaskDone()被调用),检查Master是否已完成当前阶段,然后进行下一阶段(MapPhase->ReducePhase,或ReducePhase->Done)。
// Whether all the tasks are finished or not.func (m *Master) CurrentPhaseDone() bool {return m.NUnfinishedTask == 0}// Change status from MapPhase to ReducePhase, or from ReducePhase to All Done.func (m *Master) NextPhase() {switch m.Status {case MapPhase:m.Status = ReducePhasem.MakeReduceTask()case ReducePhase:m.Status = AllDone}}
到这所有基本功能应该已经完成了。
-race会在运行时检测可能的冲突。
简单自测可发现,master在对m.Status修改时出现了冲突访问。
所以对所有修改/访问m.Status的函数需要加锁。
但是只有在NextPhase()中会对该值有写操作,且该函数一共也只执行两次,其它都是对m.Status的读。所以用read_count+两个互斥锁限制写,允许同步读。
var mutex sync.Mutex // a mutex for m.Statusvar rc_mutex sync.Mutex // a mutex for read_countvar read_count int // a read_count for m.Status// Lockfunc ReaderLock() {rc_mutex.Lock()if read_count++; read_count == 1 {mutex.Lock()}rc_mutex.Unlock()}// Unlockfunc ReaderUnlock() {rc_mutex.Lock()if read_count--; read_count == 0 {mutex.Unlock()}rc_mutex.Unlock()}
taskInfo.TaskStatus也会出现冲突。
主要在UpdateTaskInfo()、TaskDone()和TimeoutHandler()中冲突。因为和m.Status的情况不太一样,所以直接再加一个锁。
var taskStatus_mutex sync.Mutex // a mutex for TaskStatus
上面的实现中有个小问题:DoMapTask()也需使用输出临时文件,并在完成时重命名的方式。
任务完成、检查是否进入下一阶段的判定是:一个任务的状态变为Finished。但在这时这个任务可能已因超时被重复执行,但在重复执行完成前,该任务可能导致整个阶段已完成。此时重复执行的Map Worker仍在更新文件,会导致Reduce阶段使用的输入文件有问题。
所以在Map阶段的DoMapTask()也使用临时文件输出、然后更名即可。
gxb@GXB:/mnt/e/GitHub/6.824lab/src/main$ ./test-mr.sh*** Starting wc test.--- wc test: PASS*** Starting indexer test.--- indexer test: PASS*** Starting map parallelism test.--- map parallelism test: PASS*** Starting reduce parallelism test.--- reduce parallelism test: PASS*** Starting crash test.--- crash test: PASS*** PASSED ALL TESTS
完成。
Lab1总体不是很难,想清楚就好,要写的比较多。