MapReduce实现


本文是mit6.824的lab1实现记录,目前已通过所有test,但终端会有EOF报错,暂时还未排查到错误。

基本架构

MapReduce模型角色有两种:master和worker。master是map和reduce任务的调度者,起码任务调度和请求处理是异步的,所以要考虑多线程。worker是请求并处理单个任务,是单线程。

Master

主要工作如下:

  • 注册worker
  • 初始化任务数组,map阶段初始化map任务数组,reduce阶段可以理解为重置为reduce数组。
  • 分派任务
  • 任务调度,定时进行,需要检查所有任务的完成状况,以确定是否进入下一阶段。

Task

任务有五种状态:Ready,Queue,Running,Completed和Err,通过判断状态我们进行不同的处理。Ready是在初始化完成后默认的,需要加入信道并更新状态为Queue;当请求来临时,我们派发任务(注意扩充任必要信息)并更新状态为Running;根据worker返回的任务报告或超时判断更新状态为Completed或者Err;对于Err任务更改状态为Queue,并加入信道。
注:需要任务数组的原因:根据下标跟踪任务状态

Schedule方法

这里进行我们任务调度的大部分工作,调度策略是:遍历任务数组并更新状态,按照Task中的思路。同时判断是否进行任务重置。

Worker

worker一经启动就不停的去请求并执行任务,所以需要对map和reduce任务分别处理,这里我们的master返回的结构体是一样的,按需取用即可。
调用Worker方法,首先我们发送Worker注册请求,序列号递增(这里不用加锁)。然后就是不断循环去请求并执行任务。另外,我们需要保存一个isAlive变量,判断worker是否可以退出。
我们的CallReqTask方法会得到三种响应,Map,Reduce和Over,在Master完成ReducePahse阶段后,会将之后的请求任务类型全部置为Over,以便Worker能够及时退出。
注:map和reduce任务我们需要执行后报告master,但over伪任务只需要更新isAlive状态即可,我们稍后即可退出。

doMapTask

为了让每个map任务能够生成nReduce个中间文件,我们make一个二维数组,然后取模存放:

reduces := wmake([][]KeyValue, task.NReduce)
for _, kv := range kvs {
		idx := ihash(kv.Key) % task.NReduce
		reduces[idx] = append(reduces[idx], kv)
	}

doReduceTask

首先,我们的输入文件有NMap个,按照mr-X-Y格式,这里的Y我们规定为任务序列号,从而能够读取正确的文件。我们定义一个map数组来存放键值对应的序列队,并使用reducef函数输出到最终文件。