前一篇文章介绍了一下自己在实现Raft领导选举与日志复制的细节,这一篇文章继续介绍一下关于Raft节点的State持久化与日志压缩的细节。
往期文章:
Raft算法实现之领导选举与日志复制(MIT6.824 Lab2A、B | Golang)
为了简洁起见,本文出现的代码均删去了打印日志的相关代码,文中AppendEntries RPC简称为AE RPC。
我们知道,Raft的一大优势就是Fault Tolerance,即能够在部分节点宕机、失联或者出现网络分区的情况下依旧让系统正常运行。而为了保证这一点,除了领导选举与日志复制外,我们还需要定期将论文Figure 2中的非易失性State持久化到磁盘中。这样,便于某个server宕机重启后能够从磁盘恢复这些State,这也是Lab2C的主要内容。
在实现时课程组提供了一个structurePersister
来扮演磁盘的角色,我们对磁盘的读写操作均使用该structure进行。具体地,我们需要两个函数:persist()
与readPersist(data []byte)
。前者在任何非易失性State被改变时均需要调用,表示这些State的改变被写入了磁盘;后者只需要在启动/重启时调用,即Make()
函数里,表示从磁盘读取。具体的编码解码过程课程组在Start Code的注释里已经给我们写了例子,照着写就可以了,思路很清晰。
func (rf *Raft) persist() {
w := new(bytes.Buffer)
enc := labgob.NewEncoder(w)
if enc.Encode(rf.currentTerm) != nil ||
enc.Encode(rf.votedFor) != nil ||
enc.Encode(rf.logs) != nil {
DPrintf(dError, "S%v store persist error!", rf.me, rf.currentTerm)
}
raftSt := w.Bytes()
rf.persister.Save(raftSt, nil)
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
// bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
dec := labgob.NewDecoder(r)
var currenT int
var voteF int
var logs []LogEntry
if dec.Decode(¤T) != nil ||
dec.Decode(&voteF) != nil || dec.Decode(&logs) != nil {
DPrintf(dError, "S%v read persist error!", rf.me, rf.currentTerm)
} else {
rf.mu.Lock()
rf.currentTerm = currenT
rf.votedFor = voteF
rf.logs = make([]LogEntry, len(logs))
copy(rf.logs, logs)
rf.mu.Unlock()
}
}
完成了以上两个函数之后,在currentTerm
、votedFor
、logs
被修改过的地方调用persist()
就好了。Lab2C的主要代码就这些,不是很多。但Lab2C里个人认为最主要的地方是关于nextIndex
回退的优化。
当Leader的AE RPC的reply结果为false且返回的Term与Leader一样大时,说明Follower i的日志一致性检查没有通过,Leader需要减小nextIndex[i]
的值并在下一次AE RPC中重新尝试发送。论文里每次会回退一个nextIndex
,然而这样的回退方法在某些情况下会比较慢。在Lab2C的测试里,有些测试会在某一个Follower宕机之后,Start
大量的command给Leader。这样当这个Follower重启后,会落后Leader巨量的LogEntry
。如果依旧使用每次回退一个nextIndex
的策略,会造成测试超时。
论文里第七页的最后一部分提出了一种策略,即当出现需要回退nextIndex
的情况时,直接回退整个ConflictTerm
的LogEntry
。这样就从每次只回退一个LogEntry
变为了每次回退一整个Term的LogEntry
。这个ConflictTerm
为AE RPC reply参数中新增的一个成员,表示发生了冲突的那条LogEntry
对应的Term。这里课程组在论文的基础上,进一步改进了这个优化策略。
我自己在做的时候在reply中加了两个成员:ConflictIndex
和ConflictTerm
。分三种情况:
logs
比args中的PrevLogIndex
短,那么令ConflictTerm
为-1,ConflictIndex
为Follower最后一个LogEntry
的索引。ConflictTerm
设置为该LogEntry的Term,ConflictIndex
设置为上一个Term的最后一个LogEntry的Index。对与Leader的reply处理,做如下操作:
log[]
中包含了ConflictTerm
的LogEntry
,设置nextIndex[i]
为leader的log[]
中ConflictTerm
下一个Term的第一个LogEntry的Indexlog[]
中不包含了ConflictTerm
的LogEntry,设置nextIndex[i]
为ConflictIndex + 1
更具体的细节可以看前面浅析Raft的文章中的“优化”部分,里面包含详细的图解分析。这里放一下主要的伪代码:
// follower
func (rf *Raft) CheckIfPLIMatch(args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
...
if LenLogs <= args.PrevLogIndex || (rf.logs[args.PrevLogIndex].LogTerm != args.PreLogTerm) {
reply.ConflictIndex = args.PrevLogIndex
if LenLogs <= args.PrevLogIndex {
reply.ConflictIndex = Last index of Follower's LogEntry
reply.ConflictTerm = -1
} else {
reply.ConflictTerm = rf.logs[reply.ConflictIndex].LogTerm
}
reply.Success = false
for ; reply.ConflictIndex >= 0; reply.ConflictIndex-- {
if rf.logs[reply.ConflictIndex].LogTerm != reply.ConflictTerm {
break
}
}
rf.ResetTimer()
return false
}
return true
}
// leader
func (rf *Raft) UpdateNIAndMI(reply *AppendEntriesReply, peerId int) {
leaderHasConflictT := false
leaderLastEntryWithConfT := 0
// find if leader has conflict term
...
if !leaderHasConflictT || reply.ConflictTerm == -1 {
// decrement nextIndex, conflict index is another term, we need to set + 1
rf.nextIndex[peerId] = reply.ConflictIndex + 1
} else {
rf.nextIndex[peerId] = leaderLastEntryWithConfT + 1
}
}
除了以上方法,在课程Lecture6的视频中Robert Morris教授(教授的笑容真的非常有感染力,这里是教授的MIT主页。关于教授的其他故事可以了解一下hhh)在回答同学的提问时也讲到,nextIndex
回退的方法有很多种,包括但不限于课程组的方法、二分等,大家可以自由发挥。
这里顺便说一下,课程组的Lab2C的Handout关于回退逻辑的部分的建议与课上讲得有一点点不一样:
Case 2: leader has XTerm:
nextIndex = leader’s last entry for XTerm
这里课程上讲的是
Case 2: leader has XTerm:
nextIndex = leader’s last entry for XTerm + 1
即下一个Term的第一个LogEntry
的index。当然,实际运行起来,差一个LogEntry
,速度上的差异可以忽略不计。正确性上两者都是正确的。
完成以上优化,如果Lab2B的代码没有大问题,基本就能通过Lab2C的测试了。
Lab2C的Test比Lab2B的要严格很多,虽然2C的代码量不大,但如果Lab2B的代码有问题的话,Lab2C的debug会非常头疼。我自己在2C的Tests中遇到了这个几个主要问题。这几个问题在上一篇写Lab2B的文章中也提到过:
nextIndex
与matchIndex
的更新。在上一篇文章中,我曾在AE RPC返回success时,简单地将nextIndex
更新为Leader此时的len(logs) + 1
,matchIndex
更新为len(logs)
。这样会在2C的Tests,尤其是Figure8Test中发生apply error
的报错。原因在于Leader在初始化完AE参数,发送RPC时,会Unlock mutex。此时可能会收到新的Start
请求,使得Leader的logs
发生了改变。而在后面处理reply时,如果简单地根据logs
改变nextIndex
和matchIndex
,显然会使得这两个值大于实际我们要更新的值。解决方法就是根据我们发送的参数来更新:rf.nextIndex[peerId] = args.PrevLogIndex + len(args.Entries) + 1
rf.matchIndex[peerId] = args.PrevLogIndex + len(args.Entries)
如果不采取一些措施,当某个server从宕机中恢复时,其State Machine需要从第一条LogEntry开始恢复状态。随着日志的不断增加,server的恢复时间也会变得越来越长。因此,我们需要一种措施,将部分的logs
和State Machine的状态定期保存至磁盘,以减小server的时间,即日志压缩。
论文的第七节对这一部分做了阐述,在浅析Raft算法的那篇文章中也做了对应的介绍。显然,当某个server完成了一次日志压缩后,从宕机中恢复时,对与已经“压缩”的LogEntry
不需要进行恢复。因此,我们需要一个index,用于区分已压缩的LogEntry
与未压缩的LogEntry
,即lastIncludedIndex
。此外,对与这个index上的LogEntry
,还要用一个变量保存这条LogEntry
的Term,我们称之为lastIncludeTerm
,用于日志一致性检查。两者在server第一次Start时均为0,且均需要持久化。
type Raft struct {
...
// for snapshot
lastIncludedTerm int
lastIncludedIndex int
}
Handout里首先要求我们实现SnapShot()
函数,还是借助这幅图,我们理解一下这个函数的意思:
在每个server的key/value层,当apply的command到达一定规模时(这个“规模”在Lab3里会实现,Lab2可以暂时放一放),会将自己的当前状态存至磁盘,并通知Raft层将对应的LogEntry
也存至磁盘。这个通知使用的就是SnapShot()
函数。
因此,SnapShot()
的逻辑为首先截断参数index之前(包括index)的logs
。之后,更新lastIncludeIndex
和lastIncludeTerm
,并与第二个参数snapshot
一起调用rf.persister.Save
存储至“磁盘”。注意,如果更新后的lastIncludeIndex
大于commitIndex
或lastApplied
,那么这两者也需要更新为lastIncludeIndex
。原因是这两者对应的LogEntry
以及应用后的State Machine状态已被存储,不需要再次commit或者apply。
最后,调用persistWithSnapShot
进行持久化(下面会讲这个函数)。注意这里的参数index
不能大于logs
的最后一条LogEntry
的index,因为SnapShot()
是由上层State Machine调用的,参数index
表示这个索引之前的所有LogEntry
都已经被apply到了State Machine上了,不存在的LogEntry
显然不能被apply。
为了支持以上逻辑,我们需要修改2C中的persist()
以及readPersist()
,添加对于lastIncludeIndex
和lastIncludeTerm
的持久化支持。此外,我们还需存储SnapShot()
的第二个参数snapshot []byte
(这个参数是State Machine相关的状态,Lab2并不会实际使用。只需知道如何存取即可)。最简单的做法就是给persist()
加一个参数。为了不破坏之前的结构,我自己重新写了一个函数persistWithSnapShot
,并修改了persist()
,防止SnapShot被nil
覆盖:
func (rf *Raft) PersistWithSnapShot(snapshot []byte) {
// get raftSt here
...
rf.persister.Save(raftSt, snapshot)
}
func (rf *Raft) persist() {
// get raftSt here
...
// rf.persister.Save(raftSt, nil) change to below:
rf.persister.mu.Lock()
rf.persister.raftstate = func(orig []byte) []byte {
x := make([]byte, len(orig))
copy(x, orig)
return x
}(raftSt)
// here can alos be:
// rf.persister.Save(raftSt, rf.persister.ReadSnapshot())
rf.persister.mu.Unlock()
}
这里其实有一点代码冗余hhh
注意这里因为lastIncludeIndex
的存在,我们任何之前对logs
的访问,都需要进行一定的改动。举个例子,原来的logs
长度有5,SnapShot()
后截断了三个LogEntry
,此时切片logs
的长度只剩2。显然我们访问index = 4
的LogEntry
时,不能直接logs[4]
,会造成越界。正确的访问方式为logs[4 - rf.lastIncludeIndex]
。因此,我们需要对已有代码里任何涉及到rf.logs[]
访问的地方对索引进行减去lastIncludeIndex
的操作。这一点是Lab2D最耗时的地方,稍微有一点遗漏就可能造成Bug。
当然,另一种思路是将所有的索引都换成“相对索引”。上面我们之所以要将涉及到的索引减去lastIncludeIndex
,是因为我们依旧将这些索引看为全局索引,即相对于整个系统启动时的第一条LogEntry
的索引。而相对索引则指相对于本地rf.logs[]
的索引,这样不用每次访问rf.logs[]
时都需要进行减去lastIncludeIndex
的操作。但需要修改LogEntry
结构体,加一个index
成员。我个人没有使用这种方法。
实际上,按照前一种方法修改起来并没有想象中那么繁琐。我甚至在修改过程中重构了一下代码,把原来动辄上百行的函数进行了拆分,保证每一个方法不超过三十行。重构完成后的代码结构对debug也有很大的帮助。
当Leader的AE reply中,某个Follower返回的需要更新的nextIndex
小于Leader的lastIncludeIndex
时,Leader就需要发送InstallSnapshot RPC
来让Follower“安装”对应的快照,以保持LogEntry
的一致性(显然此时Leader在nextIndex
前的Entry都已经落盘的,无法发送AE rpc至Follower)。
Follower收到InstallSnapshot RPC
后,首先检查参数中Term与自身currentTerm
的大小关系进行检查,之后对比参数的lastIncludeIndex
与自身的lastIncludeIndex
,如果后者更大,说明这个快照已经存储了,直接返回。否则根据参数的lastIncludeIndex
进行logs[]
的截断:如果参数的lastIncludeIndex
大于logs
最后一条LogEntry
的index(记得加上自身的lastIncludeIndex
),那么截断所有的logs
;否则,截断包括参数的lastIncludeIndex
在内的之前所有LogEntry
。
截断完成后,更新自身的lastIncludeIndex
与lastIncludeTerm
。如果更新后的lastIncludeIndex
大于commitIndex
或lastApplied
,那么这两者也需要更新为lastIncludeIndex
。
但其实这里还有一个小问题,这个问题藏得很“不起眼”,Lab2D的的测试甚至覆盖不到,直到做了Lab3才发现。首先看一下InstallSnapshot RPC
Handler中更新lastApplied
的代码:
func (rf *Raft) DiscordLogs(args *InstallSnapShotArgs) {
...
if rf.lastApplied < rf.lastIncludedIndex {
rf.lastApplied = rf.lastIncludedIndex
}
}
逻辑与SnapShot()
里的一致,似乎没什么问题。接下来我们考虑这样一个情况:有五条Append指令,封装为LogEntry
后index从1开始,均对key为0的键进行操作。如下所示:
append{
key:0, value:1}
append{
key:0, value:2}
append{
key:0, value:3}
append{
key:0, value:4}
append{
key:0, value:5}
若正常执行,那么key/value数据库最后key为0的value为12345
(key、value均为string)。假设Leader为S1,Follower为S2。Leader发送AE时的PrevLogIndex
初始化为0,并将所有五条LogEntry
发送给S2。S2成功接收并复制到自身logs
中,但返回reply由于网络原因没有发送至Leader,所以Leader没有更新S2的nextIndex
。但因为收到其他Follower的success reply,所以更新自身的commitIndex
,并发送新一轮的AE RPC。
S2接收到新AE RPC后,发现LeaderCommit与上次不同,更新自身RPC并提交五条命令。此时S2的状态机状态为key:0, value:12345
,lastApplied = 5
。但因为网络问题,S2的reply还是没有被Leader收到。同时,Leader apply到第三条,即index为3的LogEntry
后,State Machine发起了SnapShot()
,将此时的key/value状态机的状态key:0, value:123
,lastIncludeIndex = 3
存储至磁盘。接着。Leader发送新一轮AE RPC,发现S2的PrevLogIndex = 0
小于自己的lastIncludeIndex = 3
,所以发送InstallSnapshot RPC
给S2,将S2的key/value状态机的状态由key:0, value:12345
覆盖为了key:0, value:123
。
按照上文InstallSnapshot RPC
Handler的逻辑,此时InstallSnapshot RPC
参数的lastIncludeIndex = 3
,小于lastApplied = 5
,所以S2的lastApplied
没有改变,导致S2的key/value状态机的状态中永远缺失了append{key:0, value:4}
与append{key:0, value:5}
两条命令。
正确的做法应该覆盖S2的状态机状态时,将lastApplied
也一并改变,以便S2能够重新apply第4、5条指令。也就是说,InstallSnapshot RPC
允许lastApplied
的回退!
这一点可能与论文里强调的lastApplied
递增的特性不太符合。其实,lastApplied
在某些情况下是允许回退的。除了上面的情况外,最简单的情况就是重启后lastApplied
回退为了0,重新apply没有持久化到状态机的command,两者其实一个道理。从另一个角度想,这样做其实也是Raft“强领导人特性”的一个体现。
// 修改后代码:
func (rf *Raft) DiscordLogs(args *InstallSnapShotArgs) {
...
if rf.lastApplied != rf.lastIncludedIndex {
rf.lastApplied = rf.lastIncludedIndex
}
}
最后还有一个问题,为什么SnapShot()
函数里不需要这样实现?其实原因很简单:SnapShot()
是由上层State Machine发起的,由State Machine调用,不会覆盖State Machine的状态,由上及下。而InstallSnapshot
则是由Raft层发起的,由下至上,有可能覆盖State Machine的状态。
我自己在写的时候,Lab2D本身遇到的bug其实不多,基本都是因为一些赋值顺序或者lastIncludeIndex
的遗漏造成的Bug。Lab2D的bug更多地暴露在Lab3的测试中:一个就是上面提到的Bug。这个Bug因为涉及到State Machine的内容,所以Lab2D里面确实测试不出来。
除此以外,还有一个Lab2D测试中没有覆盖到的Bug:在不可靠网络下,Leader可能无法收到Follower的reply,所以没有更新PrevLogIndex
。如果Follower的状态机调用了SnapShot
,此时Follower的lastIncludeIndex
就会增大,使得Follower在对Leader发送的AE RPC进行一致性检查时出现了PrevLogIndex - lastIncludeIndex < 0
的情况,造成访问越界。解决办法就是在RPC Handler里面加一个if分支进行判断就好。
由于手头没有可用的服务器,在自己电脑上跑又太占用资源,所以目前自己是在树莓派上跑的千次测试。(没想到几年前买的小玩具居然派上了用场hhh)
目前已经跑完了6666次的测试,2B的测试出现几次失败,其余的Lab暂时没出现问题。看了下日志,发现失败的Lab2B测试中会有两三百毫秒左右的时间所有server都没有动静,也没有上锁,仿佛整个程序被暂停了一样。这也使得Follower超时发起新的选举,导致当时Term的command无法成功提交而测试失败(某些测试中command只会Start一次,并不会retry)。初步推测可能跟Go的垃圾回收有关系,还需进一步深入(说到底还是Raft的代码逻辑结构还需要优化)。
此外,在Lab3 client、server层的测试中,SpeedTest目前暂时过不了,问题主要在于Raft层的指令提交速度不够快,(SpeedTest要求33ms之内能够apply一条command)估计这个跟Lab2B的问题之间存在一定的关系。
后续打算再优化一下Raft层的逻辑,目前的方向是在Start()
里面接收到一条新的command时立刻发送一个AE RPC,看看能不能优化一下command apply的速度,然后再找找Lab2B的日志中存在的问题。
前前后后,总共花费了四天的时间完成Lab2C与Lab2D。总体感觉上,LabC、D写起来没有前两个Lab那么纠结。究其原因,可能是C、D两个部分本来在论文中就属于锦上添花的优化部分,并不需要像LabA、B一样从0开始写代码。当然了,Debug部分还是很头疼的。写完Lab2后,又用了四天的时间完成Lab3的第一版代码,并通过了除速度测试以外的所有测试一千次。Lab3的速度测试,还是需要从Raft层入手进行优化才行。短期内可能没有足够的时间进行了,先挖一个坑,将来如果能成功优化并通过Lab3的速度测试,会连同Lab3的文章一并发上来。
最后,这篇文章花了五小时,一万一千字,如果能帮到你,那就是有意义的~
文章浏览阅读3.8k次,点赞9次,收藏28次。直接上一个工作中碰到的问题,另外一个系统开启多线程调用我这边的接口,然后我这边会开启多线程批量查询第三方接口并且返回给调用方。使用的是两三年前别人遗留下来的方法,放到线上后发现确实是可以正常取到结果,但是一旦调用,CPU占用就直接100%(部署环境是win server服务器)。因此查看了下相关的老代码并使用JProfiler查看发现是在某个while循环的时候有问题。具体项目代码就不贴了,类似于下面这段代码。while(flag) {//your code;}这里的flag._main函数使用while(1)循环cpu占用99
文章浏览阅读347次。idea shift f6 快捷键无效_idea shift +f6快捷键不生效
文章浏览阅读135次。Ecmacript 中没有DOM 和 BOM核心模块Node为JavaScript提供了很多服务器级别,这些API绝大多数都被包装到了一个具名和核心模块中了,例如文件操作的 fs 核心模块 ,http服务构建的http 模块 path 路径操作模块 os 操作系统信息模块// 用来获取机器信息的var os = require('os')// 用来操作路径的var path = require('path')// 获取当前机器的 CPU 信息console.log(os.cpus._node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是
文章浏览阅读10w+次,点赞435次,收藏3.4k次。SPSS 22 下载安装过程7.6 方差分析与回归分析的SPSS实现7.6.1 SPSS软件概述1 SPSS版本与安装2 SPSS界面3 SPSS特点4 SPSS数据7.6.2 SPSS与方差分析1 单因素方差分析2 双因素方差分析7.6.3 SPSS与回归分析SPSS回归分析过程牙膏价格问题的回归分析_化工数学模型数据回归软件
文章浏览阅读7.5k次。如何利用hutool工具包实现邮件发送功能呢?1、首先引入hutool依赖<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.19</version></dependency>2、编写邮件发送工具类package com.pc.c..._hutool发送邮件
文章浏览阅读867次,点赞2次,收藏2次。docker安装elasticsearch,elasticsearch-head,kibana,ik分词器安装方式基本有两种,一种是pull的方式,一种是Dockerfile的方式,由于pull的方式pull下来后还需配置许多东西且不便于复用,个人比较喜欢使用Dockerfile的方式所有docker支持的镜像基本都在https://hub.docker.com/docker的官网上能找到合..._docker安装kibana连接elasticsearch并且elasticsearch有密码
文章浏览阅读1.3w次,点赞57次,收藏92次。整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)近年来,随着机器学习的兴起,有一门编程语言逐渐变得火热——Python。得益于其针对机器学习提供了大量开源框架和第三方模块,内置..._beeware
文章浏览阅读7.9k次。//// ViewController.swift// Day_10_Timer//// Created by dongqiangfei on 2018/10/15.// Copyright 2018年 飞飞. All rights reserved.//import UIKitclass ViewController: UIViewController { ..._swift timer 暂停
文章浏览阅读986次,点赞2次,收藏2次。1.硬性等待让当前线程暂停执行,应用场景:代码执行速度太快了,但是UI元素没有立马加载出来,造成两者不同步,这时候就可以让代码等待一下,再去执行找元素的动作线程休眠,强制等待 Thread.sleep(long mills)package com.example.demo;import org.junit.jupiter.api.Test;import org.openqa.selenium.By;import org.openqa.selenium.firefox.Firefox.._元素三大等待
文章浏览阅读3k次,点赞4次,收藏14次。Java软件工程师职位分析_java岗位分析
文章浏览阅读2k次。Java:Unreachable code的解决方法_java unreachable code
文章浏览阅读1w次。1、html中设置标签data-*的值 标题 11111 222222、点击获取当前标签的data-url的值$('dd').on('click', function() { var urlVal = $(this).data('ur_如何根据data-*属性获取对应的标签对象