TinyKV lab2c完成总结
lab2c
lab2c是增加Raft状态机对快照的支持,因为日志不可能无限增长,所以在增长到一定程度时,我们需要通过压缩成快照的方式来把这个Index之前的所有日志压缩成一个最后的变更状态。
Raft状态机修改
引入日志之后,RaftLog的firstIndex不再是l.entries的首元素的Index了,如果有快照,我们需要返回\(Snapshot.Index+1\)。然后在Raft的消息中增加对MsgSnapshot的支持,就是把applied, committed, stable全部设置成Snapshot.Index。然后如果某个节点的Next在firstIndex之前(就是已经压缩在快照里面了)我们在sendAppend里面直接发送快照。
RawNode修改
加入对RaftLog.PendingSnapshot的判断。实现maybeCompact,即把已经压缩到快照的l.entries的日志截断。在Advance里面要把PendingSnapshot置空。
SaveReadyState修改
实现追加快照函数,先修改RaftState和applyState然后落盘,然后调用RegionSched给集群调度器发送保存快照的指令。
// Apply the peer with given snapshot
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
log.Infof("%v begin to apply snapshot", ps.Tag)
snapData := new(rspb.RaftSnapshotData)
if err := snapData.Unmarshal(snapshot.Data); err != nil {
return nil, err
}
// Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
// and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
// and ps.clearExtraData to delete stale data
// Your Code Here (2C).
var snapRes *ApplySnapResult
if ps.isInitialized() {
ClearMeta(ps.Engines, kvWB, raftWB, ps.region.Id, ps.raftState.LastIndex)
ps.clearExtraData(snapData.Region)
}
ps.raftState.LastIndex = snapshot.Metadata.Index
ps.raftState.LastTerm = snapshot.Metadata.Term
ps.applyState.TruncatedState.Index = snapshot.Metadata.Index
ps.applyState.TruncatedState.Term = snapshot.Metadata.Term
ps.applyState.AppliedIndex = snapshot.Metadata.Index
ps.snapState.StateType = snap.SnapState_Applying
kvWB.SetMeta(meta.ApplyStateKey(snapData.Region.Id), ps.applyState)
ch := make(chan bool, 1)
ps.regionSched <- &runner.RegionTaskApply{
RegionId: snapData.Region.Id,
Notifier: ch,
SnapMeta: snapshot.Metadata,
StartKey: snapData.Region.StartKey,
EndKey: snapData.Region.EndKey,
}
snapRes = &ApplySnapResult{
PrevRegion: ps.region,
Region: snapData.Region,
}
meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
return snapRes, nil
}
HandleRaftReady修改
我们在SaveRaftReady函数返回了一个ApplySnapResult的结构体,我们在这里就要判断两个Region是不是一样了,如果不是,我们就要更换Region了。
if res != nil && !reflect.DeepEqual(res.PrevRegion, res.Region) {
d.SetRegion(res.Region)
metaStore := d.ctx.storeMeta
metaStore.Lock()
metaStore.regions[res.Region.Id] = res.Region
metaStore.regionRanges.Delete(®ionItem{res.PrevRegion})
metaStore.regionRanges.ReplaceOrInsert(®ionItem{res.Region})
metaStore.Unlock()
}
process函数修改
压缩快照的指令属于AdminRequest,我们增加对其中CompactLog指令的支持。
func (d *peerMsgHandler) processAdminEntry(ent pb.Entry, msg *raft_cmdpb.RaftCmdRequest, wb *engine_util.WriteBatch) *engine_util.WriteBatch {
req := msg.AdminRequest
switch req.CmdType {
case raft_cmdpb.AdminCmdType_CompactLog:
compact := req.CompactLog
applyState := d.peerStorage.applyState
if compact.CompactIndex >= applyState.TruncatedState.Index {
applyState.TruncatedState.Index = compact.CompactIndex
applyState.TruncatedState.Term = compact.CompactTerm
wb.SetMeta(meta.ApplyStateKey(d.regionId), applyState)
wb.WriteToDB(d.ctx.engine.Kv)
d.ScheduleCompactLog(compact.CompactIndex)
wb = &engine_util.WriteBatch{}
}
}
return wb
}
完成以上部分,再注意细节实现,我们就可以通过lab2c的测试了。