MIT6.824 Lab-2A
前言
根据查阅资料以及自己查看之后发现对于$2A$中调用的测试$func$都是调用了$make_config$方法。然后因为测试代码中会调用$raft.go$中的$getstate$方法,判断当前的任期以及是否是领导人,然后我们来实现我们的代码,其中借鉴了以下的资料:
https://zhuanlan.zhihu.com/p/524341057
选举思路
首先选举实际上是可以分为两个部分,一个是首次选举,另外一个是再次选举。
首次选举
若定时器超时,说明一段时间内没有收到$Leader$的消息,那么这个节点需要转换为$Candiate$,准备竞选$Leader$
然后我们让这个$Candiate$向其他发送请求,如果超过半数以上的票发送给他那么他就会成为$Leader$,在成功之后会发送一条心跳消息
如果没有成功那么会隔一段时间之后选举,直到这个节点成为$Leader$或者其他的节点成为$Leader$
首次选举代码实现
仔细阅读$Test$代码之后我们可以发现是通过$Make$函数去调用的,那么首先我们需要先对所有节点进行一个初始化,对于初始的状态都先暂时设定他们为$Candiate$。然后对于每个节点的话我们可以启动一个$go$协程去开始计时并且开始选举。
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rf.applyChan = applyCh //2B
rf.currentTerm = 0
rf.votedFor = -1
rf.logs = make([]LogEntry, 0)
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
rf.status = Candidate
rf.overtime = time.Duration(150+rand.Intn(200)) * time.Millisecond
rf.timer = time.NewTicker(rf.overtime)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// start ticker goroutine to start elections
go rf.ticker()
return rf
}
首先我们这个节点一定是需要还存活的,因此我们可以使用一个$select$语句(因为一个$Raft$的计时器是放在一个$channel$中的,$select$语句专门进行处理这个问题),然后我们对目前如果已经超时的节点我们就将其丢出。然后我们开始进行选举,首先对于每一个已经开始选举的候选人设定一个计时器,当然为了防止出现多个候选人并且投票时间相同的情况,我们可以使用$rand$来控制相对应的时间。然后我们就暴力的去寻找其他的每个节点向他们发送一个投票请求。
在这里在进行RPC请求的时候其实还需要两个结构体,一个是$RequestVoteArgs$,另外一个是$RequestVoteReply$,对于这两个部分我们直接按照论文中的定义即可
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // 需要竞选的人的任期
CandidateId int // 需要竞选的人的Id
LastLogIndex int // 竞选人日志条目最后索引
LastLogTerm int // 候选人最后日志条目的任期号
}
// example RequestVote RPC reply structure.
// field names must start with capital letters!
// 如果竞选者任期比自己的任期还短,那就不投票,返回false
// 如果当前节点的votedFor为空,且竞选者的日志条目跟收到者的一样新则把票投给该竞选者
type RequestVoteReply struct {
// Your data here (2A).
Term int // 投票方的term,如果竞选者比自己还低就改为这个
VoteGranted bool // 是否投票给了该竞选人
VoteState VoteState // 投票状态
}
如果我们这个节点已经是$Leader$了,那么对于其他节点我们是要去更新剩下的日志数量的。
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
// 当定时器结束进行超时选举
select {
case <-rf.timer.C:
if rf.killed() {
return
}
rf.mu.Lock()
if rf.status == Follower {
rf.status = Candidate
}
if rf.status == Candidate {
rf.currentTerm += 1
rf.votedFor = rf.me
votedNums := 1
rf.overtime = time.Duration(150+rand.Intn(200)) * time.Millisecond
rf.timer.Reset(rf.overtime)
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
voteArgs := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: len(rf.logs) - 1,
LastLogTerm: 0,
}
if len(rf.logs) > 0 {
voteArgs.LastLogTerm = rf.logs[len(rf.logs)-1].Term
}
voteReply := RequestVoteReply{}
go rf.sendRequestVote(i, &voteArgs, &voteReply, &votedNums)
}
} else if rf.status == Leader {
appendNums := 1
rf.timer.Reset(HeartBeatTimeout)
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
appendEntriesArgs := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogTerm: 0,
PrevLogIndex: 0,
Entries: nil,
LeaderCommit: rf.commitIndex,
}
appendEntriesReply := AppendEntriesReply{}
go rf.sendAppendEntries(i, &appendEntriesArgs, &appendEntriesReply, &appendNums)
}
}
rf.mu.Unlock()
}
}
}
然后我们接着来考虑刚刚输送投票请求的函数,我们传递的是三个参数一个是投票的$Args$,一个是投票的$Reply$,另外一个就是当前这个候选人已经收到了多少票,然后我们再次开启一个$go$协程去输送每个投票请求。然后对于每个投票请求,我们先不断使用$RPC$去进行请求,根据论文里面所描述的,如果当前这个节点仍然存在那么我们就一直向他发送请求。
接下来我们来看一下$RPC$请求,如果当前节点$crash$了之后,那么我们就设置相对应的$Reply$的状态是不可投递的;
然后如果竞选者的任期实际上是要比这个节点要小的,那么我们也不给他投票,并且在$Reply$中打上一个$Expire$(过期)的标签,这里需要提一嘴的是过期可能是因为出现网络分区或者其他原因导致的;然后考虑还没有投票的情况:然后如果传进来的任期确实比本身这个节点任期要大的话,那么我们就更新这个节点成为$Foller$,并且我们在这里考虑第二个匹配条件判断最后一个$Log$日志是否比候选人的最后一个$Log$日志要大。
如果这个已经投票了,来自同一轮但是是不同竞选者,告诉她自己已经没票了;要么可能因为网络问题导致的,那就$continue$掉
// RequestVote
// example RequestVote RPC handler.
// 个人认为定时刷新的地方应该是别的节点与当前节点在数据上不冲突时就要刷新
// 因为如果不是数据冲突那么定时相当于防止自身去选举的一个心跳
// 如果是因为数据冲突,那么这个节点不用刷新定时是为了当前整个raft能尽快有个正确的leader
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
//defer fmt.Printf("[ func-RequestVote-rf(%+v) ] : return %v\n", rf.me, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
// 当前节点crash
if rf.killed() {
reply.VoteState = Killed
reply.Term = -1
reply.VoteGranted = false
return
}
//reason: 出现网络分区,该竞选者已经OutOfDate(过时)
if args.Term < rf.currentTerm {
reply.VoteState = Expire
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
if args.Term > rf.currentTerm {
// 重置自身的状态
rf.status = Follower
rf.currentTerm = args.Term
rf.votedFor = -1
}
//fmt.Printf("[ func-RequestVote-rf(%+v) ] : rf.voted: %v\n", rf.me, rf.votedFor)
// 此时比自己任期小的都已经把票还原
if rf.votedFor == -1 {
currentLogIndex := len(rf.logs) - 1
currentLogTerm := 0
// 如果currentLogIndex下标不是-1就把term赋值过来
if currentLogIndex >= 0 {
currentLogTerm = rf.logs[currentLogIndex].Term
}
// If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
// 论文里的第二个匹配条件,当前peer要符合arg两个参数的预期
if args.LastLogIndex < currentLogIndex || args.LastLogTerm < currentLogTerm {
reply.VoteState = Expire
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
// 给票数,并且返回true
rf.votedFor = args.CandidateId
reply.VoteState = Normal
reply.Term = rf.currentTerm
reply.VoteGranted = true
rf.timer.Reset(rf.overtime)
//fmt.Printf("[ func-RequestVote-rf(%v) ] : voted rf[%v]\n", rf.me, rf.votedFor)
} else { // 只剩下任期相同,但是票已经给了,此时存在两种情况
reply.VoteState = Voted
reply.VoteGranted = false
// 1、当前的节点是来自同一轮,不同竞选者的,但是票数已经给了(又或者它本身自己就是竞选者)
if rf.votedFor != args.CandidateId {
// 告诉reply票已经没了返回false
return
} else { // 2. 当前的节点票已经给了同一个人了,但是由于sleep等网络原因,又发送了一次请求
// 重置自身状态
rf.status = Follower
}
rf.timer.Reset(rf.overtime)
}
return
}
然后我们继续看$sendVoted$的请求,因为我们已经放出去了,那么实际上返回的可能有如下的结果
const (
Normal VoteState = iota //投票过程正常
Killed //Raft节点已终止
Expire //投票(消息\竞选者)过期
Voted //本Term内已经投过票
)
然后我们来思考怎么处理这些结果,首先如果是消息已经过期了,那么需要将自己编程追随和并且重新设定时间;如果是正常或者确认投票的话,我们统计相对应的票数,如果最后的票数超过半数的话,那么我们可以清空相对应的票数并且在$nextIndex$中放入接下来需要的日志信息
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply, voteNums *int) bool {
if rf.killed() {
return false
}
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
for !ok {
// 失败重传
if rf.killed() {
return false
}
ok = rf.peers[server].Call("Raft.RequestVote", args, reply)
}
rf.mu.Lock()
defer rf.mu.Unlock()
//fmt.Printf("[ sendRequestVote(%v) ] : send a election to %v\n", rf.me, server)
// 由于网络分区,请求投票的人的term的比自己的还小,不给予投票
if args.Term < rf.currentTerm {
return false
}
// 对reply的返回情况进行分支处理
switch reply.VoteState {
// 消息过期有两种情况:
// 1.是本身的term过期了比节点的还小
// 2.是节点日志的条目落后于节点了
case Expire:
{
rf.status = Follower
rf.timer.Reset(rf.overtime)
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
}
}
case Normal, Voted:
//根据是否同意投票,收集选票数量
if reply.VoteGranted && reply.Term == rf.currentTerm && *voteNums <= (len(rf.peers)/2) {
*voteNums++
}
// 票数超过一半
if *voteNums >= (len(rf.peers)/2)+1 {
*voteNums = 0
// 本身就是leader在网络分区中更具有话语权的leader
if rf.status == Leader {
return ok
}
// 本身不是leader,那么需要初始化nextIndex数组
rf.status = Leader
rf.nextIndex = make([]int, len(rf.peers))
for i, _ := range rf.nextIndex {
rf.nextIndex[i] = len(rf.logs) + 1
}
rf.timer.Reset(HeartBeatTimeout)
//fmt.Printf("[ sendRequestVote-func-rf(%v) ] be a leader\n", rf.me)
}
case Killed:
return false
}
return ok
}
再次选举
再次选举已经是后面的东西啦~