Rustin Liu

Raft 领导人选举实现

12 April 2020

大家好,我是 Rustin。最近在做 6.824 的实验 lab2,今天想写一篇文章记录一下前段时间做的 Raft 领导人选举机制的实现。

此博客在 GitHub 上公开发布. 如果您有任何问题或疑问,请在此处打开一个 issue

简介

6.824 是 MIT 的分布式系统公开课,今年还有官方的视频资料放出。 目前也有国内的同学正在进行翻译工作,可以参考一下。

课程质量非常高,国内也有一些整理好的相关资料可以参考。

下面我就简单描述一下 Raft 领导人选举机制实现过程和一些踩过的坑,我会尽可能的聚焦在实现实验的细节上,因为我可能对 Raft 目前的理解也比较浅显。所以如果您对这篇文章感兴趣,请您先阅读论文和观看课程视频并尝试实现。

领导人选举规则分析

在 Raft 中分别有三种角色分别是:Leader, Follower 和 Candidate。在整个系统正常运行过程中,只会有一个 Leader 节点,其他的都是 Follower 节点。只有在需要重新选举的阶段,节点会尝试将自己转换为 Candidate,然后向所有的节点发出投票 RPC 消息展开选举。

我觉得而一个比较好的切入思路就是搞明白何时需要进行 Leader 的选举?我们阅读论文会发现主要有两种情况需要进行选举,一种就是当 Raft 集群启动之后开始第一次选举,另外一种就是 Leader 出现故障,无法使用心跳机制维持自己的统治, 导致选举超时机制触发,节点开始尝试新的一轮的选举(需要注意的是,选举有可能在一个 Term 中没有结果,那么就在下个 Term 中继续选举直到选出 Leader)。

搞清楚何时进行选举,我们再来看看选举的 RPC 请求和回复的数据结构:

Request 参数 解释
term 候选人的任期号
candidateId 请求选票的候选人的 Id
lastLogIndex 候选人的最后日志条目的索引值
lastLogTerm 候选人最后日志条目的任期号


Reply 返回值 解释
term 当前任期号,以便于候选人去更新自己的任期号
voteGranted 候选人赢得了此张选票时为真


从数据结构中我们可以看到能够影响到选举的主要有两个部分,一个是 Term,另外一个就是候选人的最后一条日志条目。那我们就尝试从这两点方面去消化和理解选举的规则:

主要的规则就是这两条,第一条还是很好理解的,我们不应该投票给 Term 落后的节点。日志这一条规则出自论文 5.4.1 领导选举限制,我们必须保证我们选出的领导拥有系统中绝大部分节点承认的最新最长的日志记录。 所幸这个规则是为了保证日志的安全性,对我们实现实验的 A 部分还不影响。如果你对这条规则的设置很迷惑,可以先继续实验,等到做完日志复制部分实验之后,可能就会对这个规则有更深更清晰的理解。

当我们成为领导人之后需要通过心跳机制来维持自己的统治,也就是不停的发空日志给 Follower,然后通过这种方式是刷新该 Follower 的选举超时时间,让它不要发起选举。所以在开始实验之前我们还需要简单看一下附加日志的 PRC 请求和回复的数据结构:

参数 解释
term 领导人的任期号
leaderId 领导人的 Id,以便于跟随者重定向请求
prevLogIndex 新的日志条目紧随之前的索引值
prevLogTerm prevLogIndex 条目的任期号
entries[] 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率)
leaderCommit 领导人已经提交的日志的索引值


返回值 解释
term 当前的任期号,用于领导人去更新自己
success 跟随者包含了匹配上 prevLogIndex 和 prevLogTerm 的日志时为真


这些数据结构的大多数字段都是为了附加日志设计的,所以我们可以尽量的屏蔽掉无用的字段,目前对实验 A 有影响的就只有 Term 字段,所以我们在接下来的实现中确保 Term 处理逻辑正确即可。

测试用例分析

在开始实现之前我们先来看看实验的测试用例。领导人实验实际上很简单,所有它的测试用例也很简单,主要有两个测试:

这些测试用例都很简单,其中测试用例用到一个函数:

// 检查是否确实仅存在一个领导人。
// 为了测试重新选举,所以会多尝试几次查找。
func (cfg *config) checkOneLeader() int {
	// 多迭代几次尝试寻找领导人。
	for iters := 0; iters < 10; iters++ {
		ms := 450 + (rand.Int63() % 100)
		// 随机尝试。
		time.Sleep(time.Duration(ms) * time.Millisecond)
		// 获取领导人和对应上任 Term。
		leaders := make(map[int][]int)
		for i := 0; i < cfg.n; i++ {
			if cfg.connected[i] {
				if term, leader := cfg.rafts[i].GetState(); leader {
					leaders[term] = append(leaders[term], i)
				}
			}
		}
		// 校验没有在一个周期内选出两个领导人,找到最后新的Term。
		lastTermWithLeader := -1
		for term, leaders := range leaders {
			if len(leaders) > 1 {
				cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
			}
			if term > lastTermWithLeader {
				lastTermWithLeader = term
			}
		}
		// 找到最新的领导人。
		if len(leaders) != 0 {
			return leaders[lastTermWithLeader][0]
		}
	}
	cfg.t.Fatalf("expected one leader, got none")
	return -1
}

启动 go 程,开始选举

我们读完测试用例之后,就可以尝试开始实现领导人选举。我们需要在 Make 函数中为该节点启动一个后台常驻 go 程,在里面发起选举。

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
	// 代表无领导人。
	rf.leaderId = -1
	rf.currentTerm = 0
	// 代表还未投票。
	rf.votedFor = -1
	rf.commitIndex = 0
	rf.lastAppliedIndex = 0
	rf.state = Follower
	rf.lastReceiveTime = time.Now()
	// 开始选举。
	go rf.startLeaderElection()
	rf.readPersist(persister.ReadRaftState())
	return rf
}

// 下面附带一些会用到的一些角色转换的函数。
func (rf *Raft) convertToCandidate() {
	rf.state = Candidate
	rf.currentTerm++
	rf.votedFor = rf.me
	rf.lastReceiveTime = time.Now()
}

func (rf *Raft) convertToFollower(newTerm int) {
	rf.state = Follower
	rf.currentTerm = newTerm
	rf.votedFor = -1
	rf.lastReceiveTime = time.Now()
}

func (rf *Raft) convertToLeader() {
	rf.state = Leader
	rf.leaderId = rf.me
	rf.lastReceiveTime = time.Now()
}

在 Make 函数中,我初始化了节点的基本信息,需要注意的是我为每个节点设置了一个叫做 lastReceiveTime 的字段,这个字段可以用作表示上次收到有效的 RPC 附加日志(在试验 A 中用作心跳)的时间, 当我发现我们接受到有效心跳的时间到现在超过了选举超时时间限制时,我们就将其转换为候选人发起一轮新的选举。

关于选举我选择直接在我们的选举 go 程中开 for 循环去执行,没有再使用额外的 go 程和 channel 来处理选举超时机制,因为我觉得对我来说那样可能会有滥用 channel 的嫌疑,造成不必要的心智负担:

// 开始选举领导人。
// 当发生一次选举超时时,节点就开始一轮新的选举。
func (rf *Raft) startLeaderElection() {
	for {
		electionTimeout := rand.Intn(150)
		startTime := time.Now()
		time.Sleep(time.Duration(HeartbeatInterval+electionTimeout) * time.Millisecond)
		rf.mu.Lock()
		if atomic.LoadInt32(&rf.dead) == Dead {
			rf.mu.Unlock()
			return
		}
		// 在这个地方判断是否应该发起选举。
		if rf.lastReceiveTime.Before(startTime) {
			if rf.state != Leader {
				DPrintf("%d kicks off election on term: %d", rf.me, rf.currentTerm)
				go rf.kickOffElection()
			}
		}
		rf.mu.Unlock()
	}
}

我们可以看到真实的选举实际上被我放在了 kickOffElection 中:

// 触发选举,向所有节点发出投票请求。
func (rf *Raft) kickOffElection() {
	rf.mu.Lock()
	// 将自己转换为候选人。
	rf.convertToCandidate()
	lastLogEntry := rf.getLastLogEntry()
	args := RequestVoteArgs{
		Term:         rf.currentTerm,
		CandidateId:  rf.me,
		LastLogIndex: lastLogEntry.Index,
		LastLogTerm:  lastLogEntry.Term,
	}
	numVote := 1
	rf.mu.Unlock()
	// 开始向每个节点发送投票请求。
	for i := 0; i < len(rf.peers); i++ {
		if i != rf.me {
			go func(peerId int) {
				reply := RequestVoteReply{}
				DPrintf("%d send vote request to %d", rf.me, peerId)
				// 发出 RPC 消息,注意消息和回复字段要求大写。
				ok := rf.sendRequestVoteRPC(peerId, &args, &reply)
				if !ok {
					return
				}
				rf.mu.Lock()
				defer rf.mu.Unlock()
				// 如果投票人的 Term 比当前节点的 Term 大,那将节点转换为 Follower。
				if reply.Term > rf.currentTerm {
					rf.convertToFollower(reply.Term)
					return
				}
				// 收到了有效投票,统计结果,如果获取系统中一半以上的选票,就将该节点转换为领导人,并且开始放发出心跳维持自己的统治。
				if reply.VoteGranted {
					numVote++
					if numVote > len(rf.peers)/2 && rf.state == Candidate {
						rf.convertToLeader()
						DPrintf("%d become the leader on term: %d", rf.me, rf.currentTerm)
						for j := 0; j < len(rf.peers); j++ {
							if j != rf.me {
								go rf.replicaLogToPeer(j)
							}
						}
					}
				}
			}(i)
		}
	}
}

我们来简单梳理一下这部分逻辑:

在开始查看如何发送心跳之前,我们先来看看作为其他节点我们应该如何依照上面分析的规则进行投票,我们的投票 RPC 都是交给了 RequestVote 函数去处理:

//
// 投票请求 RPC 处理.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	DPrintf("%d start process vote request", rf.me)
	// 设置回复消息的 Term。
	reply.Term = rf.currentTerm
	// 对应我上面说的关于 Term 的那一条规则。
	if args.Term < rf.currentTerm {
		reply.VoteGranted = false
		DPrintf("%d refuse vote for %d", rf.me, args.CandidateId)
	} else {
		// 如果候选人的 Term 比我们的大,那我们应该将自己转换为 Follower。
		if args.Term > rf.currentTerm {
			rf.convertToFollower(args.Term)
		}
		lastEntry := rf.getLastLogEntry()
		// 对应我上面说的关于日志的那一条规则。
		logUpToDate := LogEntry{Command: nil, Index: args.LastLogIndex, Term: args.LastLogTerm}.isMoreUpToDate(lastEntry)
		if rf.votedFor == -1 && logUpToDate {
			rf.votedFor = args.CandidateId
			reply.VoteGranted = true
			DPrintf("%d vote for %d", rf.me, args.CandidateId)
		}
	}
}

// 是否更新更长.
func (l LogEntry) isMoreUpToDate(r LogEntry) bool {
	return (l.Term == r.Term && l.Index >= r.Index) || l.Term > r.Term
}

在上面的 RequestVote 函数中,我们主要就是根据分析中描述的规则进行投票。接下来我们看看当节点顺利成为领导人之后如何用心跳来维持自己的统治。领导人需要正确的发送心跳,不停的刷新 Follower 的 lastReceiveTime:

// 发送日志到附加节点,还是采用 for 循环的形式。
func (rf *Raft) replicaLogToPeer(peerId int) {
	DPrintf("%d start sync log to %d", rf.me, peerId)
	for {
		rf.mu.Lock()
		if rf.state != Leader {
			rf.mu.Unlock()
			DPrintf("%d stop sends append entries to %d", rf.me, peerId)
			return
		}
		rf.mu.Unlock()
		go rf.sendAppendEntry(peerId)
		// 设置一定心跳间隔。
		time.Sleep(HeartbeatInterval * time.Millisecond)
	}
}

// 发送日志给节点.
func (rf *Raft) sendAppendEntry(peerId int) {
	rf.mu.Lock()
	lastLogEntry := rf.getLastLogEntry()
	args := AppendEntriesArgs{
		Term:         rf.currentTerm, // 我们需要填好这个值,在其他节点处理该请求的时候会用到这个值。
		LeaderId:     rf.me, // 设置自己为的编号。
		PrevLogIndex: lastLogEntry.Index, // 这个值对我们目前实验无影响。
		Entries:      nil, // 表示心跳。
		LeaderCommit: -1, // 这个值对我们目前实验无影响,瞎填就可以.
	}
	reply := AppendEntriesReply{}
	rf.mu.Unlock()
	// 发送日志 RPC。
	ok := rf.sendAppendEntryRPC(peerId, &args, &reply)
	if !ok {
		log.Printf("%d send a append PRC to %d failed", rf.me, peerId)
		return
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()
	// 如果发现回复的 Term 比我们自己的高,我们就将自己转换为 Follower。
	if reply.Term > rf.currentTerm {
		rf.convertToFollower(reply.Term)
		return
	}
	log.Printf("%d success send a append to %d", rf.me, peerId)
}

上面的函数 replicaLogToPeer 和 sendAppendEntry 完成心跳 RPC 不停发送和回复处理。我们再来看一看其他节点收到心跳之后该如何处理:

//
// 附加日志 RPC 处理.
//
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	log.Printf("%d start process append entries request from %d, args term: %d node term: %d", rf.me, args.LeaderId, args.Term, rf.currentTerm)
	// 如果 Term 比改节点的小,那我们就认为这是一条无效的心跳,直接返回。
	if args.Term < rf.currentTerm {
		DPrintf("get append entries form %d, but the term less than %d", args.LeaderId, rf.me)
		reply.Term, reply.Success = rf.currentTerm, false
		return
	}
	// 刷新该节点的 lastReceiveTime.
	reply.Term = rf.currentTerm
	rf.leaderId = args.LeaderId
	rf.lastReceiveTime = time.Now()

	if args.Term > rf.currentTerm {
		DPrintf("%d convert itself as follower", rf.me)
		rf.convertToFollower(args.Term)
	}
}

上面的 AppendEntries 函数主要的作用就是去刷新 Follower 的最近一次接受到有效心跳的时间,阻止该节点发生选举超时,从而维持自己的统治。

到此为止,我们就实现完了一个简单的领导人选举,思路其实还是很清晰,主要就是完全遵照论文的数据结构和规则来按部就班的实现。由于课程要求不能公开源码,所以我就不提供实现链接啦, 希望这篇文章能帮助你梳理和理解实验 A 的实现思路。最近我也在尝试实现 Rust 版本的 Raft,到时候再来跟大家分享交流源码吧😁

踩坑记录

看上去领导人选举的思路非常简单,但是我在实现的过程中也踩了不少的坑:


参考链接

MIT 6.824 2020 Raft 实现细节汇总

Raft 课程视频

6.824 社区实现

Raft 论文中文翻译

文章链接

文章首发于: Rustin 的博客

同步更新:

知乎

简书

掘金

segmentfault

— Rustin Liu