MIT 6.824 Lab4 Sharded KeyValue Service
实验背景与目的
? 在Lab2
和Lab3
,实现了基于单RAFT
(单一集群)的多节点间数据一致性、支持增删查改、数据同步和快照保存的KV
数据库。但忽视了集群负载问题,随着数据增长到一定程度时,所有的数据请求都集中在leader
上,增加集群压力,延长请求响应时。
? Lab4
的内容就是将数据按照某种方式分开存储到不同的RAFT
集群上,保证相应数据请求引流到对应的集群,降低单一集群的压力,提供更为高效、更为健壮的服务。
? 整体架构如下图:
-
具体的
lab4
要实现一个支持multi-raft
分片 、分片数据动态迁移的线性一致性分布式KV
存储服务。 -
shard
表示互不相交并且组成完整数据库的每一个数据库子集。group
表示server
的集合,包含一个或多个server
。一个shard
只可属于一个group
,一个group
可包含(管理)多个shard
。 -
lab4A
实现ShardCtrler
服务,作用:提供高可用的集群配置管理服务,记录了每组(Group
)ShardKVServer
的集群信息和每个分片(shard
)服务于哪组(Group
)ShardKVServer
。具体实现通过Raft
维护 一个Configs
数组,单个config
具体内容如下:Num
:config number
Shards
:shard -> gid
,分片位置信息,Shards[3]=2
,说明分片序号为3
的分片负贵的集群是Group2
(gid=2
)Groups
:gid -> servers[]
,集群成员信息,Group[3]=['server1','server2']
,说明gid = 3
的集群Group3
包含两台名称为server1 & server2
的机器
-
lab4B
实现ShardKVServer
服务
Lab 4A
? lab4A
实现ShardCtrler
服务,作用:提供高可用的集群配置管理服务,记录了每组(Group
)ShardKVServer
的集群信息和每个分片(shard
)服务于哪组(Group
)ShardKVServer
。具体实现通过Raft
维护 一个Configs
数组,具体内容如下:
Num
:config number
Shards
:shard -> gid
,分片位置信息,Shards[3]=2
,说明分片序号为3
的分片负贵的集群是Group2
(gid=2
)Groups
:gid -> servers[]
,集群成员信息,Group[3]=['server1','server2']
,说明gid = 3
的集群Group3
包含两台名称为server1 & server2
的机器
? 代码实现基本与Lab3
类似,可以直接照抄复制,且不需要实现快照服务,具体根据实现 Join, Leave, Move, Query
服务。
Query
: 查询最新的Config
信息。Move
将数据库子集Shard
分配给GID
的Group
。Join
: 新加入的Group
信息,要求在每一个group
平衡分布shard
,即任意两个group
之间的shard
数目相差不能为1
,具体实现每一次找出含有shard
数目最多的和最少的,最多的给最少的一个,循环直到满足条件为止。坑为:GID = 0
是无效配置,一开始所有分片分配给GID=0
,需要优先分配;map
的迭代时无序的,不确定顺序的话,同一个命令在不同节点上计算出来的新配置不一致,按sort
排序之后遍历即可。且map
是引用对象,需要用深拷贝做复制。Leave
: 移除Group
,同样别忘记实现均衡,将移除的Group
的shard
每一次分配给数目最小的Group
就行,如果全部删除,别忘记将shard
置为无效的0。
其他代码与完全一致,关键代码实现:
func (mcf *MemoryConfig) Query(num int) (Config, Err) {
// 如果该数字为 -1 或大于已知的最大配置数字,则 shardctrler 应回复最新配置。
if num < 0 || num >= len(mcf.Configs) {
return mcf.Configs[len(mcf.Configs)-1], OK
}
return mcf.Configs[num], OK
}
func (mcf *MemoryConfig) Move(shard int, gid int) Err {
lastConfig := mcf.Configs[len(mcf.Configs)-1]
newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
newConfig.Shards[shard] = gid
mcf.Configs = append(mcf.Configs, newConfig)
return OK
}
func (mcf *MemoryConfig) Join(groups map[int][]string) Err {
lastConfig := mcf.Configs[len(mcf.Configs)-1]
newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
for gid, servers := range groups {
if _, ok := newConfig.Groups[gid]; !ok {
newServers := make([]string, len(servers))
copy(newServers, servers)
newConfig.Groups[gid] = newServers
}
}
// balance
g2s := groupToShards(newConfig)
for {
s, t := getMaxNumShardByGid(g2s), getMinNumShardByGid(g2s)
if s != 0 && len(g2s[s])-len(g2s[t]) <= 1 {
break
}
g2s[t] = append(g2s[t], g2s[s][0])
g2s[s] = g2s[s][1:]
}
var newShards [NShards]int
for gid, shards := range g2s {
for _, shardId := range shards {
newShards[shardId] = gid
}
}
newConfig.Shards = newShards
mcf.Configs = append(mcf.Configs, newConfig)
return OK
}
func (mcf *MemoryConfig) Leave(gids []int) Err {
lastConfig := mcf.Configs[len(mcf.Configs)-1]
newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
g2s := groupToShards(newConfig)
noUsedShards := make([]int, 0)
for _, gid := range gids {
if _, ok := newConfig.Groups[gid]; ok {
delete(newConfig.Groups, gid)
}
if shards, ok := g2s[gid]; ok {
noUsedShards = append(noUsedShards, shards...)
delete(g2s, gid)
}
}
var newShards [NShards]int
if len(newConfig.Groups) > 0 {
for _, shardId := range noUsedShards {
t := getMinNumShardByGid(g2s)
g2s[t] = append(g2s[t], shardId)
}
for gid, shards := range g2s {
for _, shardId := range shards {
newShards[shardId] = gid
}
}
}
newConfig.Shards = newShards
mcf.Configs = append(mcf.Configs, newConfig)
return OK
}
func getMinNumShardByGid(g2s map[int][]int) int {
// 不固定顺序的话,可能会导致两次的config不同
gids := make([]int, 0)
for key := range g2s {
gids = append(gids, key)
}
sort.Ints(gids)
min, index := NShards+1, -1
for _, gid := range gids {
if gid != 0 && len(g2s[gid]) < min {
min = len(g2s[gid])
index = gid
}
}
return index
}
func getMaxNumShardByGid(g2s map[int][]int) int {
// GID = 0 是无效配置,一开始所有分片分配给GID=0
if shards, ok := g2s[0]; ok && len(shards) > 0 {
return 0
}
gids := make([]int, 0)
for key := range g2s {
gids = append(gids, key)
}
sort.Ints(gids)
max, index := -1, -1
for _, gid := range gids {
if len(g2s[gid]) > max {
max = len(g2s[gid])
index = gid
}
}
return index
}
func groupToShards(config Config) map[int][]int {
g2s := make(map[int][]int)
for gid := range config.Groups {
g2s[gid] = make([]int, 0)
}
for shardId, gid := range config.Shards {
g2s[gid] = append(g2s[gid], shardId)
}
return g2s
}
func deepCopy(groups map[int][]string) map[int][]string {
newGroups := make(map[int][]string)
for gid, servers := range groups {
newServers := make([]string, len(servers))
copy(newServers, servers)
newGroups[gid] = newServers
}
return newGroups
}
Lab 4B
待补充