目标
本次Lab的目标是在Lab 2实现的Raft协议的基础上构建一个简易分布式Key-Value存储系统,包括服务器和客户端,以及实现Raft协议中的快照的处理。
设计与实现
Lab 3A
客户端
客户端(Clerk
)为用户提供简洁的Get
、Put
以及Append
接口,通过RPC与Leader服务器通信。
每个客户端在开始运行时随机生成一个64位整数作为唯一标识ID
),本次实验中假设两个客户端的标识不会冲突。每个客户端还保存一个序列号nextSeq
,与ID
一起使用来唯一标识每个请求。
每当用户调用上述接口时,客户端首先将序列号nextSeq
增加一,并将用户参数转换成一个RPC请求,请求内包含ID
和nextSeq
作为标识以及用户操作的参数。然后,客户端按照顺序依次尝试向各个服务器发送RPC请求并等待响应,直到某一个服务器返回成功响应。其中,尝试时首先从上一次响应成功的服务器开始。当服务器成功响应,说明操作成功且服务器很可能为Leader(可能由于网络原因导致第一次成功响应没有到达客户端,而此响应被保存在了每一个服务器上。当客户端重试时,任意一个服务器,包括非Leader的服务器,都可以返回正确的成功响应。因此,不能确保返回成功响应的服务器一定为Leader)。此时,客户端处理响应,并将结果返回给用户。同时,客户端会记录这次成功的服务器的序号,以便下次直接从该服务器进行尝试。若下次用户调用接口前Leader没有发生变化,则可以大概率直接命中Leader,提高效率。
此外,客户端的三个接口受一个大锁保护,同一时刻只能同时执行和等待一个操作,方便服务器去重。
服务器
服务器(KVServer
)负责接收客户端的请求,将客户端的操作加入Raft协议的日志(log)中,并等待Raft协议将该操作提交(commit)。
具体而言,服务器分为如下两部分:客户端请求处理(doOp
)以及Raft协议消息处理(apply
)。
客户端请求处理
服务器收到客户端的请求后,其RPC库会启动一个新的goroutine,服务器在该goroutine中首先检查该请求是否过时(序列号小于服务器记录的值),若过时,则直接返回错误信息。否则,服务器接着检查同样的请求是否在之前收到过:
- 若未收到过同样的请求,则说明该请求是新的,服务器调用Raft协议的
Start
接口来追加日志(log)。若追加失败,则说明该服务器不是Leader,服务器向客户端返回错误信息。若追加成功,服务器创建一个条件变量用于等待该请求对应的日志被提交。同时,服务器将该请求的标识和参数以及该条件变量存储起来(存储至clients[].Ops
),便于后续去重,以及当该日志被提交时用该条件变量进行通知。 - 若收到过同样的请求并且服务器存储了对应的响应,则说明这一请求已经被Raft协议提交,但响应没有成功到达客户端,客户端重新发起了请求。此时,服务器直接返回存储的响应,无论当前服务器是否为Leader。
- 若收到过同样的请求但还未存储对应的响应,则说明该请求还未被提交(也未被取消),仍需等待,服务器应当存储了首次收到同样请求时创建的条件变量。此时,服务器开始等待这一条件变量。请注意,可能有若干goroutine在等待同一个条件变量,例如,客户端由于某种原因重复多次发送了同一个请求。
Raft协议消息处理
每当服务器从applyCh
收到Raft协议的消息,服务器会根据消息类型分别进行处理:
- 若该消息为term变更消息(
ObservesNewTerm
),则说明Leader可能发生了变化,当前服务器若之前是Leader,这时可能不再是Leader。此时,服务器会将所有未完成(未被提交)的请求的状态标记为取消状态,并用它们各自对应的条件变量通知相应的goroutine,然后(从clients[].Ops
中)删除这些请求的记录,允许这些请求被客户端重试。这些goroutine随即会向客户端返回错误信息。 - 否则,该消息应当为提交消息,说明某个请求对应的日志被Raft协议成功提交了。此时,服务器首先检查该请求是否过时或重复(序列号小于服务器记录的值,或者该请求的状态为已完成),若过时或重复,则直接忽略这个消息。否则,服务器会按该请求规定的语义执行,如
Get
、Put
以及Append
,并产生返回值。最后,服务器会(从clients[].Ops
中)查找相应的请求。若找到了相应请求的记录,服务器会用其对应的条件变量通知相应的goroutine。这些goroutine随即会向客户端返回成功响应,一次请求就处理完成了。若没有找到相应请求的记录,则说明该服务器不是接收该请求的服务器,这个请求是通过Raft协议同步到当前服务器的。此时,服务器会创建一个该请求的完成状态的记录,便于后续去重。最后,该请求被提交说明客户端收到了该请求之前的所有请求的响应,于是服务器会(从clients[].Ops
中)清除所有序列号小于该请求的请求的记录。
Lab 3B
对于这个任务,需要修改Raft协议(Lab 2完成的Raft协议)以及上述的服务器来支持快照。客户端则不需要修改。
对Raft协议的修改
需要对Raft协议进行以下修改来支持快照:
- Raft状态大小检查
- 创建快照接口
TakeSnapshot
- 支持截断的日志
InstallSnapshot
RPC请求处理方法InstallSnapshot
RPC响应处理方法- 修改
AppendEntries
超时重试定时器 - 快照的读取
Raft状态大小检查
本节对每次持久化操作persist()
加入了检查,一旦某一次持久化操作后Raft状态大小超过阈值(目前取为大小限值的一半),则通过applyCh
向上层应用(服务器)发送需要快照消息(NeedSnapshot
),通知上层应用创建快照。
创建快照接口TakeSnapshot
当上层应用(服务器)通过TakeSnapshot
创建快照时,其提供一个字节序列作为该上层应用的原始快照信息,以及该快照存储的最后一条日志的序号lastApplied
。此时,Raft协议会记录这个字节序列,并删除lastApplied
及之前的日志,截取lastApplied
之后的日志,一个快照就创建好了。随后,Raft协议将此快照以及当前Raft状态持久化存储。
支持截断的日志
创建快照后,被快照覆盖的日志就不再需要实际存储在Raft状态中了。Raft状态中实际存储的日志为创建该快照之后新添加的日志,Raft状态中存储的首个日志的前一条日志的序号及term由LastIncludedIndex
及LastIncludedTerm
存储,方便之后使用。本节对Raft协议实现中每个使用到日志的地方进行了修改,使其能够支持截断的日志。
InstallSnapshot
RPC请求处理方法
当接收者收到一个InstallSnapshot
请求时,其按照与AppendEntries
RPC请求处理方法相同的流程检查term、设置状态。检查通过后,继续后续处理。
该请求中含有一个存储原始快照信息的字节序列,以及该快照覆盖的最后一条日志的序号LastIncludedIndex
及term LastIncludedTerm
。接收者首先用本地相同序号的日志的term与其进行比对,若本地没有相应序号的日志或者term不符,则接收者清除本地所有的日志。否则,说明日志匹配,接收者仅需清除LastIncludedIndex
及之前的日志。
最后,接收者将该请求中的快照信息存储到本地,并持久化,然后通过applyCh
向上层应用(服务器)发送原始快照信息,供上层应用利用该快照恢复状态。
InstallSnapshot
RPC响应处理方法
当接收者收到一个InstallSnapshot
响应时,其按照与AppendEntries
RPC响应处理方法相同的流程检查term、设置状态。检查通过后,继续后续处理。
收到有效的响应说明快照安装成功,响应者此时至少同步了LastIncludedIndex
及之前日志的信息。于是,接收者(此时应为Leader)会利用这一点来更新nextIndex
值以及matchIndex
值。
修改AppendEntries
超时重试定时器
当该定时器触发且当前节点为Leader时,若其他某个节点需要的日志(nextIndex
及之后的所有日志)没有存储在当前节点中(即已被快照覆盖),那么当前节点会向这一节点发送InstallSnapshot
RPC请求来为其安装快照。
快照的读取
每次Raft协议启动,其会从持久化介质中读取快照。读取后,Raft协议会通过applyCh
向上层应用(服务器)发送原始快照信息,供上层应用恢复状态。
对服务器的修改
服务器需要进行修改来处理从applyCh
收到的与快照有关的Raft协议消息。同样,服务器会根据消息类型分别处理:
- 若该消息为需要快照消息(
NeedSnapshot
),说明Raft状态大小已达到阈值,需要创建快照了。此时,服务器会将当前的状态(包括Key-Value存储的内容state[]
,以及当前所有客户端的请求及其状态clients[].Ops
)序列化为字节序列,然后调用Raft协议的TakeSnapshot
接口通过Raft协议创建快照。 - 若该消息为安装快照消息(
InstallSnapshot
),说明Raft协议通知服务器安装一个新的快照。该消息中包含该快照存储的最后一条日志的序号lastApplied
以及一个存储原始快照信息的字节序列。服务器将该字节序列反序列化为状态信息(state[]
及clients[]
)后,用其完全替换当前的状态,并记录lastApplied
,完成快照的安装。其中,服务器在安装快照前会将所有未完成的请求的状态标记为取消状态,并用它们各自对应的条件变量进行通知,然后(从clients[].Ops
中)删除。
代码根据规定不予直接公开,可以与我联系,进行讨论。
发表评论