MIT 6.824 Lab4 Sharded KeyValue Service


实验背景与目的

? 在Lab2Lab3,实现了基于单RAFT(单一集群)的多节点间数据一致性、支持增删查改、数据同步和快照保存的KV数据库。但忽视了集群负载问题,随着数据增长到一定程度时,所有的数据请求都集中在leader上,增加集群压力,延长请求响应时。

? Lab4的内容就是将数据按照某种方式分开存储到不同的RAFT集群上,保证相应数据请求引流到对应的集群,降低单一集群的压力,提供更为高效、更为健壮的服务。

? 整体架构如下图:

  1. 具体的lab4要实现一个支持 multi-raft分片 、分片数据动态迁移的线性一致性分布式 KV 存储服务。

  2. shard表示互不相交并且组成完整数据库的每一个数据库子集。group表示server的集合,包含一个或多个server。一个shard只可属于一个group,一个group可包含(管理)多个shard

  3. lab4A实现ShardCtrler服务,作用:提供高可用的集群配置管理服务,记录了每组(GroupShardKVServer的集群信息和每个分片(shard)服务于哪组(GroupShardKVServer。具体实现通过Raft维护 一个Configs数组,单个config具体内容如下:

    • Numconfig number
    • Shardsshard -> gid,分片位置信息,Shards[3]=2,说明分片序号为3的分片负贵的集群是Group2gid=2
    • Groupsgid -> servers[],集群成员信息,Group[3]=['server1','server2'],说明gid = 3的集群Group3包含两台名称为server1 & server2的机器
  4. lab4B实现ShardKVServer服务

Lab 4A

? lab4A实现ShardCtrler服务,作用:提供高可用的集群配置管理服务,记录了每组(GroupShardKVServer的集群信息和每个分片(shard)服务于哪组(GroupShardKVServer。具体实现通过Raft维护 一个Configs数组,具体内容如下:

  • Numconfig number
  • Shardsshard -> gid,分片位置信息,Shards[3]=2,说明分片序号为3的分片负贵的集群是Group2gid=2
  • Groupsgid -> servers[],集群成员信息,Group[3]=['server1','server2'],说明gid = 3的集群Group3包含两台名称为server1 & server2的机器

? 代码实现基本与Lab3 类似,可以直接照抄复制,且不需要实现快照服务,具体根据实现 Join, Leave, Move, Query 服务。

  1. Query: 查询最新的Config信息。
  2. Move 将数据库子集Shard分配给GIDGroup
  3. Join: 新加入的Group信息,要求在每一个group平衡分布shard,即任意两个group之间的shard数目相差不能为1,具体实现每一次找出含有shard数目最多的和最少的,最多的给最少的一个,循环直到满足条件为止。坑为:GID = 0 是无效配置,一开始所有分片分配给GID=0,需要优先分配;map的迭代时无序的,不确定顺序的话,同一个命令在不同节点上计算出来的新配置不一致,按sort排序之后遍历即可。且 map 是引用对象,需要用深拷贝做复制。
  4. Leave: 移除Group,同样别忘记实现均衡,将移除的Groupshard每一次分配给数目最小的Group就行,如果全部删除,别忘记将shard置为无效的0。

其他代码与完全一致,关键代码实现:

func (mcf *MemoryConfig) Query(num int) (Config, Err) {
	// 如果该数字为 -1 或大于已知的最大配置数字,则 shardctrler 应回复最新配置。
	if num < 0 || num >= len(mcf.Configs) {
		return mcf.Configs[len(mcf.Configs)-1], OK
	}
	return mcf.Configs[num], OK
}

func (mcf *MemoryConfig) Move(shard int, gid int) Err {
	lastConfig := mcf.Configs[len(mcf.Configs)-1]
	newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
	newConfig.Shards[shard] = gid
	mcf.Configs = append(mcf.Configs, newConfig)
	return OK
}

func (mcf *MemoryConfig) Join(groups map[int][]string) Err {
	lastConfig := mcf.Configs[len(mcf.Configs)-1]
	newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}

	for gid, servers := range groups {
		if _, ok := newConfig.Groups[gid]; !ok {
			newServers := make([]string, len(servers))
			copy(newServers, servers)
			newConfig.Groups[gid] = newServers
		}
	}

	// balance
	g2s := groupToShards(newConfig)

	for {
		s, t := getMaxNumShardByGid(g2s), getMinNumShardByGid(g2s)
		if s != 0 && len(g2s[s])-len(g2s[t]) <= 1 {
			break
		}
		g2s[t] = append(g2s[t], g2s[s][0])
		g2s[s] = g2s[s][1:]
	}

	var newShards [NShards]int
	for gid, shards := range g2s {
		for _, shardId := range shards {
			newShards[shardId] = gid
		}
	}
	newConfig.Shards = newShards
	mcf.Configs = append(mcf.Configs, newConfig)
	return OK
}

func (mcf *MemoryConfig) Leave(gids []int) Err {
	lastConfig := mcf.Configs[len(mcf.Configs)-1]
	newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}

	g2s := groupToShards(newConfig)

	noUsedShards := make([]int, 0)
	for _, gid := range gids {
		if _, ok := newConfig.Groups[gid]; ok {
			delete(newConfig.Groups, gid)
		}
		if shards, ok := g2s[gid]; ok {
			noUsedShards = append(noUsedShards, shards...)
			delete(g2s, gid)
		}
	}

	var newShards [NShards]int
	if len(newConfig.Groups) > 0 {
		for _, shardId := range noUsedShards {
			t := getMinNumShardByGid(g2s)
			g2s[t] = append(g2s[t], shardId)
		}

		for gid, shards := range g2s {
			for _, shardId := range shards {
				newShards[shardId] = gid
			}
		}
	}
	newConfig.Shards = newShards
	mcf.Configs = append(mcf.Configs, newConfig)
	return OK
}

func getMinNumShardByGid(g2s map[int][]int) int {
	// 不固定顺序的话,可能会导致两次的config不同
	gids := make([]int, 0)
	for key := range g2s {
		gids = append(gids, key)
	}

	sort.Ints(gids)

	min, index := NShards+1, -1
	for _, gid := range gids {
		if gid != 0 && len(g2s[gid]) < min {
			min = len(g2s[gid])
			index = gid
		}
	}
	return index
}

func getMaxNumShardByGid(g2s map[int][]int) int {
	// GID = 0 是无效配置,一开始所有分片分配给GID=0
	if shards, ok := g2s[0]; ok && len(shards) > 0 {
		return 0
	}

	gids := make([]int, 0)
	for key := range g2s {
		gids = append(gids, key)
	}

	sort.Ints(gids)

	max, index := -1, -1
	for _, gid := range gids {
		if len(g2s[gid]) > max {
			max = len(g2s[gid])
			index = gid
		}
	}
	return index
}
func groupToShards(config Config) map[int][]int {
	g2s := make(map[int][]int)
	for gid := range config.Groups {
		g2s[gid] = make([]int, 0)
	}
	for shardId, gid := range config.Shards {
		g2s[gid] = append(g2s[gid], shardId)
	}
	return g2s
}

func deepCopy(groups map[int][]string) map[int][]string {
	newGroups := make(map[int][]string)
	for gid, servers := range groups {
		newServers := make([]string, len(servers))
		copy(newServers, servers)
		newGroups[gid] = newServers
	}
	return newGroups
}

Lab 4B

待补充