[关闭]
@SovietPower 2022-05-17T14:06:10.000000Z 字数 8314 阅读 730

MIT 6.824 Distributed Systems Lab2

Study



schedule:https://pdos.csail.mit.edu/6.824/schedule.html
中文视频:https://www.bilibili.com/video/BV1x7411M7Sf

Lab2参考:
实验介绍:
https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
实现建议:
https://pdos.csail.mit.edu/6.824/labs/raft-structure.txt
https://thesquareplanet.com/blog/students-guide-to-raft/
关于锁:
https://pdos.csail.mit.edu/6.824/labs/raft-locking.txt
可能可参考:
https://zhuanlan.zhihu.com/p/264448558
https://zhuanlan.zhihu.com/p/476644274
https://zhuanlan.zhihu.com/p/388523882
https://github.com/Toma62299781/MIT-6.824-spring2021


Raft

介绍:
https://zhuanlan.zhihu.com/p/32052223
https://blog.csdn.net/qq_40378034/article/details/117404484
论文:https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf
论文翻译:https://zhuanlan.zhihu.com/p/343560811


实验准备

代码可以用原始代码,也可用Lab1完成后的代码。要修改的内容在raft文件夹中。

deadlockMutex代替sync,可在运行时进行死锁检测。
只给一两个int变量用的锁,可以换成atomic

测试不过时,可以看看test-test.go的逻辑。


Lab2

测试方式

src/raft文件夹下:

部分测试
go test -run [测试部分] [-race]
测试部分如:2A 2B TestReElection2A...

全部测试
go test

多次测试
可以用go-test-many
.\go-test-many.sh <测试次数> <并行数> <测试部分>

2A

2A要完成选举,以及实现使用AppendEntries的心跳。

Add the Figure 2 state for leader election to the Raft struct in raft.go. You'll also need to define a struct to hold information about each log entry.

首先根据论文的Figure 2完善Raft的定义,然后定义LogEntry
Make中初始化Raft,然后把GetState()写完。


It's easiest to use time.Sleep() with a small constant argument to
drive the periodic checks. Don't use time.Ticker and time.Timer;
they are tricky to use correctly.

对于每个server,起一个线程,根据其当前的状态,执行三种可能的死循环(状态改变则退出):一种用来发送心跳(leader);一种定期检查是否有一段时间未收到心跳(follower),可能要进行状态转换;一种发起选举,等待结果(candidate),若有结果则改变状态,若超时则重新选举。
三个循环每次循环均要Sleep某段时间。

但是这样写,到后面会不太方便:

  1. // mainloop
  2. func (rf *Raft) mainloop() {
  3. for !rf.killed() {
  4. switch rf.getState() {
  5. case Follower:
  6. rf.followerLoop()
  7. break
  8. case Candidate:
  9. rf.candidateLoop()
  10. break
  11. case Leader:
  12. rf.leaderLoop()
  13. break
  14. default:
  15. println("!! Unknown state:", rf.state)
  16. }
  17. }
  18. }
  19. func (rf *Raft) followerLoop() {
  20. rf.resetHBTimeoutAt()
  21. for rf.getState() == Follower {
  22. time.Sleep(HBCheckPeriod)
  23. if rf.checkHBTimeout() {
  24. if !rf.killed() {
  25. rf.stateMu.Lock()
  26. rf.becomeCandidate()
  27. rf.stateMu.Unlock()
  28. }
  29. return
  30. }
  31. }
  32. }
  33. func (rf *Raft) candidateLoop() {
  34. // become candidate just now.
  35. for rf.getState() == Candidate {
  36. time.Sleep(ElectionTimeoutPeriod)
  37. rf.startElection()
  38. }
  39. }

Follower做的事也可以看做:等待心跳超时,超时后尝试成为Candidate、发起选举,如果等待期间或选举时收到heartbeat(或term更高的请求选票),则放弃,重新等待超时后再次尝试。
也就是将三个Loop写在一起,用"一段时间内是否收到heartbeat"替换超时时间。

因为是一个mainLoop、异步投票,可能要sleep一个完整的心跳超时时间,mainLoop才会退出candidateLoop、执行leaderLoop并发送心跳。
所以为了方便,在成为leader后就起一个线程,死循环发送心跳(直到不是leader)。但mainLoop要一直无事可做。


You must pick election timeouts (and thus heartbeat intervals) that are short enough that it's very likely that an election will complete in less than five seconds even if it requires multiple rounds.
The paper's Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds. Because the tester limits you to 10 heartbeats per second, you will have to use an election timeout larger than the paper's 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.

时间暂时设为:
leader发送心跳间隔100ms,心跳超时开始选举时间(election timeout)为600~1000ms,选举超时时间也即心跳超时时间。


根据Figure 2,完善RequestVote RPCAppendEntries RPC及其相关结构体。
基本跟着流程写就好了。

注意:


根据锁的种类和常用地方的不同,使用了多个锁,而不是只用一个rf.mu

  1. {
  2. state State
  3. stateMu deadlock.Mutex // Lock to protect shared access to this peer's state. A mutex mainly for status
  4. // persistent state on all servers
  5. currentTerm int
  6. votedFor int
  7. logs []LogEntry
  8. perStateMu deadlock.Mutex // A mutex mainly for persistent state(currentTerm, votedFor and logs)
  9. // Be careful that stateMu MUST lock before perStateMu and unlock after perStateMu if used to avoid deadlock.
  10. // volatile state on leaders
  11. nextIndex []int // index of the next log entry to send to tha server (initialized to leader last log index+1)
  12. matchIndex []int // index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
  13. leaderMu deadlock.Mutex // used for nextIndex, matchIndex
  14. voteMu deadlock.Mutex // prevent tryElection while processing RequestVote() and vice versa
  15. // voteMu MUST lock before mu, perStateMu and unlock after mu, perStateMu if used to avoid deadlock.
  16. }

这样会比较非常麻烦,但应该性能更好?(一般都是靠rf.mu锁整个结构体)

但是这样在2B有其它的问题:
尝试开始选举tryElection()、正在给他人投票RequestVote(),这两个之间是原子性操作,不能重合(因为在RequestVote()结束前不能确定是否会投出票、重置超时时间),需要用锁voteMu包住整个函数。

然后会有死锁(不太明白,已经保证了锁的先后顺序),虽然有deadlock检测,但还是很难查,所以还是放弃了,只用一个锁。


2B

在之前的基础上,需要:

因为介绍很少,可以看看test_test.go的测试:
TestBasicAgree2B中用到三个config.go函数:
make_config():初始化。
nCommitted():计算某个index的log被多少server提交了。会检查该log内容是否正确。
one():让leader发送一条日志,并检查其它服务器是否能同步commit。具体:枚举所有server,调用其rf.Start(cmd)。若找到leader(Start()返回true,并提交、向其它server同步日志cmd),则不断检查该日志cmd是否已被足够数量(expectedServers)的服务器提交。若2s内未满足,则测试失败(若one的参数retry为true,则允许尝试寻找下一个可能的leader,不会立刻失败,但要在10s内完成)。

所以可知:TestBasicAgree2B运行3个server,依次发送3个log,每次检查log是否被所有服务器提交(且log内容不能变,index要正确)。

one()可知,Start()的参数为要提交的指令,返回值分别为:当前指令提交后的index(若为leader)(即lastLogIndex+1)、当前term、当前server是否是leader。

  1. func (rf *Raft) Start(command interface{}) (int, int, bool) {
  2. // Your code here (2B).
  3. rf.perStateMu.Lock()
  4. defer rf.perStateMu.Unlock()
  5. index := -1
  6. term := rf.currentTerm
  7. isLeader := rf.getState() == Leader
  8. if isLeader {
  9. // new log
  10. index = rf.lastLogIndex + 1
  11. rf.logs = append(rf.logs, LogEntry{index, rf.currentTerm, command})
  12. rf.updateLastLog()
  13. }
  14. return index, term, isLeader
  15. }

通过Start()传递命令后:
下次心跳时,leader将新命令异步传给follower。leader需要检查传输结果,当大多数传输成功时,进行commit,更新commitIndex
在下一次心跳时,leader将新的LeaderCommit传给follower,使follower也进行commit。
所以一个命令需要至少两次心跳,才可保证其被大多数server commit。

根据Figure 2的Rules for leader,继续完善“检查传输结果”部分:

  • If last log index≥nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex
    • If successful: update nextIndex and matchIndex for follower
    • If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
  • If there exists an N such that N > commitIndex, a majority of matchIndex[i]≥N, and log[N].term == currentTerm: set commitIndex = N

a majority of matchIndex[i]≥NN,可以给matchIndex排序(如从小到大),取较小的中位数matchIndex[(n+1)/2]即为N
但要注意me是最新的,但保证matchIndex[me]=0上式就是正确的,,让它排到temp[0]可在初始化时将matchIndex[me]设为math.MaxInt,让它排到temp[0]

  1. // If there exists an N such that N > commitIndex, a majority of matchIndex[i]≥N, and log[N].term == currentTerm: set commitIndex = N
  2. temp := rf.matchIndex
  3. sort.Ints(temp)
  4. N := temp[(len(temp)-1)/2]
  5. if N < len(rf.logs) && rf.logs[N].Term == rf.getCurrentTerm() {
  6. rf.commit(N)
  7. }

还有非leader的commit:

If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine.
Send each newly committed entry on applyCh(defined in Make) on each peer.

两个都可在每次修改commitIndex时进行,即放到rf.commit()中。

  1. // Update CommitIndex and commit.
  2. func (rf *Raft) commit(index int) {
  3. rf.commitIndex = max(rf.commitIndex, index)
  4. // If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine.
  5. for rf.lastApplied < rf.commitIndex {
  6. rf.lastApplied++
  7. log := &rf.logs[rf.lastApplied]
  8. rf.applyCh <- ApplyMsg{
  9. CommandValid: true,
  10. Command: log.Command,
  11. CommandIndex: log.Index,
  12. }
  13. }
  14. }

一个冲突问题:
sendAppendEntries()的这条语句:
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
会读取args.Entries
而handler AppendEntries()中的这句:
rf.logs = append(rf.logs[:conflictID], args.Entries[i:]...)
会写args.Entries,导致race?

开始想的是在AppendEntriesArgs加锁,但是Mutex的实现里有小写的结构体成员,不能用做RPC参数。。用atomic int32又是忙等影响效率。
所以得copy一个??

  1. temp := make([]LogEntry, len(args.Entries))
  2. copy(temp, args.Entries)
  3. ...
  4. rf.logs = append(rf.logs, temp[i:]...)
  5. // Wrong: append(rf.logs, args.Entries[i:]...)

#

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注