MIT6.824Lab2B


Lab 2B

大体流程思路

首先我们在$Lab1$里面是实现了相对应的选举的问题,然后我们在$Lab2$中需要实现的是对客户端发送日志给$Leader$之后,然后$Leader$再把这条日志数据发送给各个$slave$节点,如果半数的$slave$统一更行之后$leader$再返回更新。

发送ticker

因为$ticker$实际上是每个$Raft$的心脏,根据论文以及$2A$的睡觉哦下面我们也是在$ticker$中调用。首先是对于$Leader$的情况,我们很显然需要进行心跳/日志同步,那么我们进去之后构造相对应的$msg$

args := AppendEntriesArgs{
						Term:         rf.currentTerm,
						LeaderId:     rf.me,
						PrevLogIndex: 0,
						PrevLogTerm:  0,
						Entries:      nil,
						LeaderCommit: rf.commitIndex, // commitIndex为大多数log所认可的commitIndex
					}

然后考虑到有些$follower$节点的$log$可能并不完善,但是我们有$nextIndex$数组,我们将这一个区间的信息作为$Entry$一起传进去即可。然后我们就可以启动一个$go$协程来进行$AppendEntries$的动作了。

func (rf *Raft) ticker() {

	for rf.killed() == false {

		// Your code here to check if a leader election should
		// be started and to randomize sleeping time using

		// 当定时器结束进行超时选举
		select {

		case <-rf.timer.C:
			if rf.killed() {
				return
			}
			rf.mu.Lock()
			// 根据自身的status进行一次ticker
			switch rf.status {

			// follower变成竞选者
			case Follower:
				rf.status = Candidate
				fallthrough
			case Candidate:

				// 初始化自身的任期、并把票投给自己
				rf.currentTerm += 1
				rf.votedFor = rf.me
				votedNums := 1 // 统计自身的票数
				rf.persist()

				// 每轮选举开始时,重新设置选举超时
				rf.overtime = time.Duration(150+rand.Intn(200)) * time.Millisecond // 随机产生200-400ms
				rf.timer.Reset(rf.overtime)

				// 对自身以外的节点进行选举
				for i := 0; i < len(rf.peers); i++ {
					if i == rf.me {
						continue
					}

					voteArgs := RequestVoteArgs{
						Term:         rf.currentTerm,
						CandidateId:  rf.me,
						LastLogIndex: len(rf.logs),
						LastLogTerm:  0,
					}
					if len(rf.logs) > 0 {
						voteArgs.LastLogTerm = rf.logs[len(rf.logs)-1].Term
					}

					voteReply := RequestVoteReply{}

					go rf.sendRequestVote(i, &voteArgs, &voteReply, &votedNums)
				}
			case Leader:
				// 进行心跳/日志同步
				appendNums := 1 // 对于正确返回的节点数量
				rf.timer.Reset(HeartBeatTimeout)
				// 构造msg
				for i := 0; i < len(rf.peers); i++ {
					if i == rf.me {
						continue
					}
					args := AppendEntriesArgs{
						Term:         rf.currentTerm,
						LeaderId:     rf.me,
						PrevLogIndex: 0,
						PrevLogTerm:  0,
						Entries:      nil,
						LeaderCommit: rf.commitIndex, // commitIndex为大多数log所认可的commitIndex
					}

					reply := AppendEntriesReply{}

					// 如果nextIndex[i]长度不等于rf.logs,代表与leader的log entries不一致,需要附带过去
					args.Entries = rf.logs[rf.nextIndex[i]-1:]

					// 代表已经不是初始值0
					if rf.nextIndex[i] > 0 {
						args.PrevLogIndex = rf.nextIndex[i] - 1
					}

					if args.PrevLogIndex > 0 {
						//fmt.Println("len(rf.log):", len(rf.logs), "PrevLogIndex):", args.PrevLogIndex, "rf.nextIndex[i]", rf.nextIndex[i])
						args.PrevLogTerm = rf.logs[args.PrevLogIndex-1].Term
					}

					//fmt.Printf("[	ticker(%v) ] : send a election to %v\n", rf.me, i)
					go rf.sendAppendEntries(i, &args, &reply, &appendNums)
				}
			}

			rf.mu.Unlock()
		}

	}
}

在进行添加日志的协程中我们需要注意论文中有一点就是如果$append$失败那么我们应该不断的$retries$,要么这个$log$成功被存储,要么其中的一个$rf$已经超时

然后我们先看一下添加日志的$RPC$,我们可以对当前节点的情况分类讨论一下:

  1. 如果当前节点已经$crash$掉了那么肯定无法添加日志因此我们结束;
  2. 当前节点存活,但是当前节点的任期比传过来的任期要大,可能出现网络分区的情况
  3. 如果传进来的添加日志起点比当前日志节点的最新的日志还要大,拒绝添加日志
  4. 传进来的节点下标在当前raft的任期和传进来的任期不同,拒绝添加日志
  5. 无上述情况则可以进行添加日志

在添加日志的时候我们需要对处理到的$rf$进行一个$ticker$吃哦内阁制,然后增加返回即可

// AppendEntries 建立心跳、同步日志RPC
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	//fmt.Printf("[	AppendEntries func-rf(%v) 	] arg:%+v,------ rf.logs:%v \n", rf.me, args, rf.logs)

	// 节点crash
	if rf.killed() {
		reply.AppState = AppKilled
		reply.Term = -1
		reply.Success = false
		return
	}

	//  args.Term < rf.currentTerm:出现网络分区,args的任期,比当前raft的任期还小,说明args之前所在的分区已经OutOfDate 2A
	if args.Term < rf.currentTerm {
		reply.AppState = AppOutOfDate
		reply.Term = rf.currentTerm
		reply.Success = false
		return
	}

	// 出现conflict的情况
	// paper:Reply false if log doesn’t contain an entry at prevLogIndex,whose term matches prevLogTerm (§5.3)
	// 首先要保证自身len(rf)大于0否则数组越界
	// 1、 如果preLogIndex的大于当前日志的最大的下标说明跟随者缺失日志,拒绝附加日志
	// 2、 如果preLog出的任期和preLogIndex处的任期和preLogTerm不相等,那么说明日志存在conflict,拒绝附加日志
	if args.PrevLogIndex > 0 && (len(rf.logs) < args.PrevLogIndex || rf.logs[args.PrevLogIndex-1].Term != args.PrevLogTerm) {
		reply.AppState = Mismatch
		reply.Term = rf.currentTerm
		reply.Success = false
		reply.UpNextIndex = rf.lastApplied + 1
		return
	}

	// 如果当前节点提交的Index比传过来的还高,说明当前节点的日志已经超前,需返回过去
	if args.PrevLogIndex != -1 && rf.lastApplied > args.PrevLogIndex {
		reply.AppState = AppCommitted
		reply.Term = rf.currentTerm
		reply.Success = false
		reply.UpNextIndex = rf.lastApplied + 1
		return
	}

	// 对当前的rf进行ticker重置
	rf.currentTerm = args.Term
	rf.votedFor = args.LeaderId
	rf.status = Follower
	rf.timer.Reset(rf.overtime)

	// 对返回的reply进行赋值
	reply.AppState = AppNormal
	reply.Term = rf.currentTerm
	reply.Success = true

	// 如果存在日志包那么进行追加
	if args.Entries != nil {
		rf.logs = rf.logs[:args.PrevLogIndex]
		rf.logs = append(rf.logs, args.Entries...)

	}
	rf.persist()

	// 将日志提交至与Leader相同
	for rf.lastApplied < args.LeaderCommit {
		rf.lastApplied++
		applyMsg := ApplyMsg{
			CommandValid: true,
			CommandIndex: rf.lastApplied,
			Command:      rf.logs[rf.lastApplied-1].Command,
		}
		rf.applyChan <- applyMsg
		rf.commitIndex = rf.lastApplied
		//fmt.Printf("[	AppendEntries func-rf(%v)	] commitLog  \n", rf.me)
	}

	return
}

然后我们返回到之前的协程之中,对于上述的情况我们应该如何解决呢?

  1. 对于超时了的节点我们需要将他重新变成$follower$,然后重置他现在的任期是$reply$的
  2. 如果出现不匹配的情况比如当前任期比较大的,也让他重新回到$follower$
  3. 对于目标节点正常的,我们需要记录一个同意$append$的数字,然后记录即可
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply, appendNums *int) {

	if rf.killed() {
		return
	}

	// paper中5.3节第一段末尾提到,如果append失败应该不断的retries ,直到这个log成功的被store
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
	for !ok {

		if rf.killed() {
			return
		}
		ok = rf.peers[server].Call("Raft.AppendEntries", args, reply)

	}

	// 必须在加在这里否则加载前面retry时进入时,RPC也需要一个锁,但是又获取不到,因为锁已经被加上了
	rf.mu.Lock()
	defer rf.mu.Unlock()

	//fmt.Printf("[	sendAppendEntries func-rf(%v)	] get reply :%+v from rf(%v)\n", rf.me, reply, server)
	// 对reply的返回状态进行分支
	switch reply.AppState {

	// 目标节点crash
	case AppKilled:
		{
			return
		}

	// 目标节点正常返回
	case AppNormal:
		{

			// 2A的test目的是让Leader能不能连续任期,所以2A只需要对节点初始化然后返回就好
			// 2B需要判断返回的节点是否超过半数commit,才能将自身commit
			if reply.Success && reply.Term == rf.currentTerm && *appendNums <= len(rf.peers)/2 {
				*appendNums++
			}

			// 说明返回的值已经大过了自身数组
			if rf.nextIndex[server] > len(rf.logs)+1 {
				return
			}
			rf.nextIndex[server] += len(args.Entries)
			if *appendNums > len(rf.peers)/2 {
				// 保证幂等性,不会提交第二次
				*appendNums = 0

				if len(rf.logs) == 0 || rf.logs[len(rf.logs)-1].Term != rf.currentTerm {
					return
				}

				for rf.lastApplied < len(rf.logs) {
					rf.lastApplied++
					applyMsg := ApplyMsg{
						CommandValid: true,
						Command:      rf.logs[rf.lastApplied-1].Command,
						CommandIndex: rf.lastApplied,
					}
					rf.applyChan <- applyMsg
					rf.commitIndex = rf.lastApplied
					//fmt.Printf("[	sendAppendEntries func-rf(%v)	] commitLog  \n", rf.me)
				}

			}
			//fmt.Printf("[	sendAppendEntries func-rf(%v)	] rf.log :%+v  ; rf.lastApplied:%v\n",
			//	rf.me, rf.logs, rf.lastApplied)

			return
		}

	case Mismatch, AppCommitted:
		if reply.Term > rf.currentTerm {
			rf.status = Follower
			rf.votedFor = -1
			rf.timer.Reset(rf.overtime)
			rf.currentTerm = reply.Term
			rf.persist()
		}
		rf.nextIndex[server] = reply.UpNextIndex
	//If AppendEntries RPC received from new leader: convert to follower(paper - 5.2)
	//reason: 出现网络分区,该Leader已经OutOfDate(过时),term小于发送者
	case AppOutOfDate:

		// 该节点变成追随者,并重置rf状态
		rf.status = Follower
		rf.votedFor = -1
		rf.timer.Reset(rf.overtime)
		rf.currentTerm = reply.Term
		rf.persist()

	}

	return
}

文章作者: Treasure
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Treasure !
  目录