Lab 2B
大体流程思路
首先我们在$Lab1$里面是实现了相对应的选举的问题,然后我们在$Lab2$中需要实现的是对客户端发送日志给$Leader$之后,然后$Leader$再把这条日志数据发送给各个$slave$节点,如果半数的$slave$统一更行之后$leader$再返回更新。
发送ticker
因为$ticker$实际上是每个$Raft$的心脏,根据论文以及$2A$的睡觉哦下面我们也是在$ticker$中调用。首先是对于$Leader$的情况,我们很显然需要进行心跳/日志同步,那么我们进去之后构造相对应的$msg$
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
Entries: nil,
LeaderCommit: rf.commitIndex, // commitIndex为大多数log所认可的commitIndex
}
然后考虑到有些$follower$节点的$log$可能并不完善,但是我们有$nextIndex$数组,我们将这一个区间的信息作为$Entry$一起传进去即可。然后我们就可以启动一个$go$协程来进行$AppendEntries$的动作了。
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// 当定时器结束进行超时选举
select {
case <-rf.timer.C:
if rf.killed() {
return
}
rf.mu.Lock()
// 根据自身的status进行一次ticker
switch rf.status {
// follower变成竞选者
case Follower:
rf.status = Candidate
fallthrough
case Candidate:
// 初始化自身的任期、并把票投给自己
rf.currentTerm += 1
rf.votedFor = rf.me
votedNums := 1 // 统计自身的票数
rf.persist()
// 每轮选举开始时,重新设置选举超时
rf.overtime = time.Duration(150+rand.Intn(200)) * time.Millisecond // 随机产生200-400ms
rf.timer.Reset(rf.overtime)
// 对自身以外的节点进行选举
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
voteArgs := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: len(rf.logs),
LastLogTerm: 0,
}
if len(rf.logs) > 0 {
voteArgs.LastLogTerm = rf.logs[len(rf.logs)-1].Term
}
voteReply := RequestVoteReply{}
go rf.sendRequestVote(i, &voteArgs, &voteReply, &votedNums)
}
case Leader:
// 进行心跳/日志同步
appendNums := 1 // 对于正确返回的节点数量
rf.timer.Reset(HeartBeatTimeout)
// 构造msg
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
Entries: nil,
LeaderCommit: rf.commitIndex, // commitIndex为大多数log所认可的commitIndex
}
reply := AppendEntriesReply{}
// 如果nextIndex[i]长度不等于rf.logs,代表与leader的log entries不一致,需要附带过去
args.Entries = rf.logs[rf.nextIndex[i]-1:]
// 代表已经不是初始值0
if rf.nextIndex[i] > 0 {
args.PrevLogIndex = rf.nextIndex[i] - 1
}
if args.PrevLogIndex > 0 {
//fmt.Println("len(rf.log):", len(rf.logs), "PrevLogIndex):", args.PrevLogIndex, "rf.nextIndex[i]", rf.nextIndex[i])
args.PrevLogTerm = rf.logs[args.PrevLogIndex-1].Term
}
//fmt.Printf("[ ticker(%v) ] : send a election to %v\n", rf.me, i)
go rf.sendAppendEntries(i, &args, &reply, &appendNums)
}
}
rf.mu.Unlock()
}
}
}
在进行添加日志的协程中我们需要注意论文中有一点就是如果$append$失败那么我们应该不断的$retries$,要么这个$log$成功被存储,要么其中的一个$rf$已经超时
然后我们先看一下添加日志的$RPC$,我们可以对当前节点的情况分类讨论一下:
- 如果当前节点已经$crash$掉了那么肯定无法添加日志因此我们结束;
- 当前节点存活,但是当前节点的任期比传过来的任期要大,可能出现网络分区的情况
- 如果传进来的添加日志起点比当前日志节点的最新的日志还要大,拒绝添加日志
- 传进来的节点下标在当前raft的任期和传进来的任期不同,拒绝添加日志
- 无上述情况则可以进行添加日志
在添加日志的时候我们需要对处理到的$rf$进行一个$ticker$吃哦内阁制,然后增加返回即可
// AppendEntries 建立心跳、同步日志RPC
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
//fmt.Printf("[ AppendEntries func-rf(%v) ] arg:%+v,------ rf.logs:%v \n", rf.me, args, rf.logs)
// 节点crash
if rf.killed() {
reply.AppState = AppKilled
reply.Term = -1
reply.Success = false
return
}
// args.Term < rf.currentTerm:出现网络分区,args的任期,比当前raft的任期还小,说明args之前所在的分区已经OutOfDate 2A
if args.Term < rf.currentTerm {
reply.AppState = AppOutOfDate
reply.Term = rf.currentTerm
reply.Success = false
return
}
// 出现conflict的情况
// paper:Reply false if log doesn’t contain an entry at prevLogIndex,whose term matches prevLogTerm (§5.3)
// 首先要保证自身len(rf)大于0否则数组越界
// 1、 如果preLogIndex的大于当前日志的最大的下标说明跟随者缺失日志,拒绝附加日志
// 2、 如果preLog出的任期和preLogIndex处的任期和preLogTerm不相等,那么说明日志存在conflict,拒绝附加日志
if args.PrevLogIndex > 0 && (len(rf.logs) < args.PrevLogIndex || rf.logs[args.PrevLogIndex-1].Term != args.PrevLogTerm) {
reply.AppState = Mismatch
reply.Term = rf.currentTerm
reply.Success = false
reply.UpNextIndex = rf.lastApplied + 1
return
}
// 如果当前节点提交的Index比传过来的还高,说明当前节点的日志已经超前,需返回过去
if args.PrevLogIndex != -1 && rf.lastApplied > args.PrevLogIndex {
reply.AppState = AppCommitted
reply.Term = rf.currentTerm
reply.Success = false
reply.UpNextIndex = rf.lastApplied + 1
return
}
// 对当前的rf进行ticker重置
rf.currentTerm = args.Term
rf.votedFor = args.LeaderId
rf.status = Follower
rf.timer.Reset(rf.overtime)
// 对返回的reply进行赋值
reply.AppState = AppNormal
reply.Term = rf.currentTerm
reply.Success = true
// 如果存在日志包那么进行追加
if args.Entries != nil {
rf.logs = rf.logs[:args.PrevLogIndex]
rf.logs = append(rf.logs, args.Entries...)
}
rf.persist()
// 将日志提交至与Leader相同
for rf.lastApplied < args.LeaderCommit {
rf.lastApplied++
applyMsg := ApplyMsg{
CommandValid: true,
CommandIndex: rf.lastApplied,
Command: rf.logs[rf.lastApplied-1].Command,
}
rf.applyChan <- applyMsg
rf.commitIndex = rf.lastApplied
//fmt.Printf("[ AppendEntries func-rf(%v) ] commitLog \n", rf.me)
}
return
}
然后我们返回到之前的协程之中,对于上述的情况我们应该如何解决呢?
- 对于超时了的节点我们需要将他重新变成$follower$,然后重置他现在的任期是$reply$的
- 如果出现不匹配的情况比如当前任期比较大的,也让他重新回到$follower$
- 对于目标节点正常的,我们需要记录一个同意$append$的数字,然后记录即可
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply, appendNums *int) {
if rf.killed() {
return
}
// paper中5.3节第一段末尾提到,如果append失败应该不断的retries ,直到这个log成功的被store
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
for !ok {
if rf.killed() {
return
}
ok = rf.peers[server].Call("Raft.AppendEntries", args, reply)
}
// 必须在加在这里否则加载前面retry时进入时,RPC也需要一个锁,但是又获取不到,因为锁已经被加上了
rf.mu.Lock()
defer rf.mu.Unlock()
//fmt.Printf("[ sendAppendEntries func-rf(%v) ] get reply :%+v from rf(%v)\n", rf.me, reply, server)
// 对reply的返回状态进行分支
switch reply.AppState {
// 目标节点crash
case AppKilled:
{
return
}
// 目标节点正常返回
case AppNormal:
{
// 2A的test目的是让Leader能不能连续任期,所以2A只需要对节点初始化然后返回就好
// 2B需要判断返回的节点是否超过半数commit,才能将自身commit
if reply.Success && reply.Term == rf.currentTerm && *appendNums <= len(rf.peers)/2 {
*appendNums++
}
// 说明返回的值已经大过了自身数组
if rf.nextIndex[server] > len(rf.logs)+1 {
return
}
rf.nextIndex[server] += len(args.Entries)
if *appendNums > len(rf.peers)/2 {
// 保证幂等性,不会提交第二次
*appendNums = 0
if len(rf.logs) == 0 || rf.logs[len(rf.logs)-1].Term != rf.currentTerm {
return
}
for rf.lastApplied < len(rf.logs) {
rf.lastApplied++
applyMsg := ApplyMsg{
CommandValid: true,
Command: rf.logs[rf.lastApplied-1].Command,
CommandIndex: rf.lastApplied,
}
rf.applyChan <- applyMsg
rf.commitIndex = rf.lastApplied
//fmt.Printf("[ sendAppendEntries func-rf(%v) ] commitLog \n", rf.me)
}
}
//fmt.Printf("[ sendAppendEntries func-rf(%v) ] rf.log :%+v ; rf.lastApplied:%v\n",
// rf.me, rf.logs, rf.lastApplied)
return
}
case Mismatch, AppCommitted:
if reply.Term > rf.currentTerm {
rf.status = Follower
rf.votedFor = -1
rf.timer.Reset(rf.overtime)
rf.currentTerm = reply.Term
rf.persist()
}
rf.nextIndex[server] = reply.UpNextIndex
//If AppendEntries RPC received from new leader: convert to follower(paper - 5.2)
//reason: 出现网络分区,该Leader已经OutOfDate(过时),term小于发送者
case AppOutOfDate:
// 该节点变成追随者,并重置rf状态
rf.status = Follower
rf.votedFor = -1
rf.timer.Reset(rf.overtime)
rf.currentTerm = reply.Term
rf.persist()
}
return
}