MapReduce 原理与实践


MapReduce 简介

MapReduce 核心思想

Hadoop MapReduce 是一个编程框架,它可以轻松地编写应用程序,以可靠的、容错的方式处理大量的数据(数千个节点)。

正如其名,MapReduce 的工作模式主要分为 Map 阶段和 Reduce 阶段

一个 MapReduce 任务(Job)通常将输入的数据集分割成独立的块,这些块被 map 任务以完全并行的方式处理。框架对映射(map)的输出进行排序,然后将其输入到 reduce 任务中。通常,作业的输入和输出都存储在文件系统中框架负责调度任务、监视任务并重新执行失败的任务

在 Hadoop 集群中,计算节点一般和存储节点相同,即 MapReduce 框架和 Hadoop 分布式文件系统均运行在同一组节点上。这种配置允许框架有效地调度已经存在数据的节点上的作业,使得跨集群的带宽具有较高的聚合度,能够有效利用资源。

MapReduce 框架由单个主节点(Master)的 ResourceManager、每个从节点(Slave) NodeManager 和每个应用程序的 MRAppMaster 组成

在处于最低程度时,应用程序通过实现适当的接口以及抽象类来指定输入输出的位置、提供 map 和 reduce 函数。在实现过程中还会涉及到作业配置等参数的设置。

在编程框架完善并打包之后,Hadoop 的作业客户端(job client)可以将作业(一般是 jar 包或者可执行文件)和配置项提交给 ResourceManager 。后者负责将作业代码和配置项分发给从节点(Slave),之后负责作业的调度和监视,同时也向作业客户端提供状态和诊断信息。

  • Client Service: 应用提交、终止、输出信息(应用、队列、集群等的状态信息)
  • Adaminstration Service: 队列、节点、Client 权限管理
  • ApplicationMasterService: 注册、终止 ApplicationMaster, 获取 ApplicationMaster 的资源申请或取消的请求,并将其异步地传给 Scheduler, 单线程处理
  • ApplicationMaster Liveliness Monitor: 接收 ApplicationMaster 的心跳消息,如果某个 ApplicationMaster 在一定时间内没有发送心跳,则被任务失效,其资源将会被回收,然后 ResourceManager 会重新分配一个 ApplicationMaster 运行该应用(默认尝试 2 次)
  • Resource Tracker Service: 注册节点, 接收各注册节点的心跳消息
  • NodeManagers Liveliness Monitor: 监控每个节点的心跳消息,如果长时间没有收到心跳消息,则认为该节点无效, 同时所有在该节点上的 Container 都标记成无效,也不会调度任务到该节点运行
  • ApplicationManager: 管理应用程序,记录和管理已完成的应用
  • ApplicationMaster Launcher: 一个应用提交后,负责与 NodeManager 交互,分配 Container 并加载 ApplicationMaster,也负责终止或销毁
  • YarnScheduler: 资源调度分配, 有 FIFO(with Priority),Fair,Capacity 方式
  • ContainerAllocationExpirer: 管理已分配但没有启用的 Container,超过一定时间则将其回收

MapReduce 编程框架

你可能想知道 MapReduce 究竟处理什么形式的数据。事实上, MapReduce 框架只对    形式的键值对进行处理。

换言之,该框架会将任务的输入当成一组    键值对,最后也会生成一组    键值对作为结果。其中的 key 和 value 可以根据具体问题将其理解为不同的类型。

key 和 value 的类必须由框架来完成序列化,因此我们需要做的就是实现其中的可写接口(Writable)。此外,对于其中的一些关键类还必须实现 WritableComparable 接口,以便于框架对其进行排序。在讲到具体的实现时我们会提到上述知识点。

综上所述,一个 MapReduce 作业从输入到输出的过程中,经历了以下过程。

首先是一些说明:

  • :输入的原始数据,即使不是 K/V 形式的,你也可以在代码中对其进行格式上的调整。
  • :输出的计算结果。

因此,这个过程主要经历了以下几步:

 -> Map ->  -> Combine ->  -> Reduce -> 

下面我们将对这个过程中涉及到的关键概念进行讲解。同时,也请阅读 Google 发表的关于 MapReduce 原理的论文。这篇文章对 MapReduce 的工作原理进行了深入的讲解,可以帮助理解各个流程之间的关系。

扩展链接:

  • MapReduce: Simplified Data Processing on Large Clusters

Mapper

Map 阶段是一个独立的任务,由 Mapper 负责将输入的键值对(以下简称为“输入对”)映射到一组中间键值对。转换后的中间键值对(以下简称为“中间对”)不需要与原始的输入有相同的类型。给定的输入键值对可以映射到零或多个输出键值对(以下简称为“输出对”)。

例如,  这个键值对可以在 Map 阶段分别映射为    和    两个键值对。

总体而言,Mapper 的实现过程主要是向  Job.setmapperclass(Class)  方法传递具体的作业代码。输入的数据被划分为等长的小数据块,称为输入分片(InputSplit)。然后框架为该作业中 InputSplit 所包含的每个键值对调用  map(WritableComparable,Writable, Context)  方法。

当然,我们在应用程序的开发中可以重写  cleanup(Context)  方法来执行一些清理工作。

刚刚提到输出对不需要与输入对具有相同的类型,给定的输入对可以映射到零或多个输出对。输出对将会在  context.write(WritableComparable, Writable)  方法调用时被收集在一起。

此外,应用程序可以使用 Counter (计数器)来报告一些统计数据。

所有与给定输出键有关的中间值都将被框架分组,并传递给 Reducer 以产生最终的输出结果。我们可以通过  Job.setGroupingComparatorClass(Class)  方法来指定 Comparator (比较器)以控制分组过程。这稍微有些复杂,在初次学习时可以仅作了解。

随后,Mapper 的输出将在 Reducer 中进行排序和分区。分区的总数量与作业的 Reducer 任务数相同。用户可以通过实现自定义 Partitioner (分区器)来控制哪些键(及其相关的键值对)由哪些 Reducer 处理。

用户可以通过  Job.setCombinerClass(Class)  方法来指定 Combiner (组合器)。这个 Combiner 用于在本地执行中间输出的聚合操作,这将有利于减少从 Mapper 传输到 Reducer 的数据量。本地执行是指在单个节点范围内进行处理,减少数据量是指跨节点的数据传输量变少。

在上述过程中产生的中间输出总是以  (key-len, key, value-len, value)  的格式存储,并且这些中间输出是已经排过序的。我们可以在应用程序中通过设置 Configuration 对象的参数来利用 CompressionCodec 工具控制是否要压缩中间输出,以及如何压缩中间输出。


面试时容易问到一个问题是如何计算 Map 的数量:一个简单的回答是其数量通常由输入数据的总大小决定,即输入文件块的总数。

对于 Map 来说,正常情况下的并行计算量是每个节点大约 10-100 个 Map 。对于某些计算量较小的 Map 任务,这个数量可以设置为 300 个。任务的设置过程也是需要耗费时间的,所以在海量数据背景下,最好让 Map 的处理时间不低于一分钟,以最大化地利用集群资源。

因此,如果预计输入数据有 10TB ,而块大小设置为 128MB ,那默认情况下将会产生 82000 个 map 。如果我们通过  Configuration.set(MRJobConfig。NUM_MAPS, int)  设置了更高的值,则这个数量将会根据设置再次改变。

Reducer

Map 过程(包括了 Combine)之后就是 Reduce 过程。Reduce 过程利用 Reducer 共享键来减少中间值集合的规模

在作业中,reducer 的数量可以通过  job.setNumReduceTasks(int)  方法设置。

具体而言,Reducer 在实现时需要我们通过  Job.setReducerClass(Class)  方法来传递具体的作业代码,并可以通过重写覆盖该方法来完成初始化。框架将对分组后的输入中的每个    调用  reduce(WritableComparable, Iterable, Context)  方法。同样的,我们可以在应用程序中重写  cleanup(Context)  方法来执行清理工作。

Reducer 有三个主要阶段,分别是:shuffle 、 sort 和 reduce 。

  • Shuffle:也称之为混洗,mapper 的排序输出是 Reducer 的输入。在这个阶段,MapReduce 框架通过 HTTP 协议获取所有 Mapper 输出的相关分区。
  • Sort排序。由于不同的 Mapper 可能输出相同的键,因此在此阶段,框架将对 Reducer 的输入通过键进行分组。需要特别说明的是,Shuffle(混洗)和 Sort (排序)阶段可能会同时发生。当获取到 map 的输出时,这两个阶段就会被合并进行。
  • Reduce规约。在这个阶段将会对每一个    记录都调用  reduce(WritableComparable, Iterable, Context)  方法进行处理。作为最后一个阶段,Reduce 任务的输出通常都通过  Context.write(WritableComparable, Writable)  方法写入到文件系统(无论是 HDFS 还是本地文件系统)。在这个过程中,Reducer 的输出是无序的,我们也可以使用 Counter 来统计一些数据。

Partitioner

Partitioner 用于对键的空间进行分区。

具体来说,它控制了中间 Map 输出的键的分区,其目的是为了优化并减少计算量。键(或键的子集)用于派生分区,通常由散列函数派生。分区的总数与作业的 Reduce 任务数相同HashPartitioner 是 MapReduce 使用的默认分区程序

Counter

Counter (计数器)是 MapReduce 应用程序报告统计数据的一种工具。在 Mapper 和 Reducer 的具体实现中,可以利用 Counter 来报告统计信息

除了需要我们自己实现的部分,MapReduce 也自带了非常实用的 Mapper 、 Reducer 和 Partitioner 库,你可以在官方文档的  Package Summary  章节了解他们的具体用法。

WordCount 实验

在了解了 MapReduce 的工作原理之后,我们将再次使用 WordCount 来帮助理解它的工作过程。只是,这一次将由我们自己编写 WordCount 的代码来实现相关的逻辑。

MapReduce 作业的编译与执行

准备

准备输入文件

在集群启动完成后,我们上传一些数据作为稍后词频统计的“原料”。

请在终端中输入以下命令来完成上传,这一次上传的文件是 Linux 系统 /etc 目录下的  protocols  协议文件。

hdfs dfs -put /etc/protocols /user/hadoop

创建文件

数据准备好之后,在终端中输入以下命令,新建一个名为  WordCount.java  的文件。

# 请在 hadoop 用户的主目录新建该文件
cd /home/hadoop
touch WordCount.java

使用 Vim 编辑器打开并编辑该文件。你也可以使用其他的编辑器。

vim /home/hadoop/WordCount.java

在  WordCount.java  文件内先输入以下代码,完成整个 WordCount 程序的大体框架。

//  以下是本程序会用到的相关的包,先引入
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

// 注意类名要与文件名的大小写一致
public class WordCount {

    // 自定义的 TokenizerMapper 类将继承自 Mapper 类,以实现相关的接口和方法
    // 在 Map 阶段将会执行其中的作业逻辑
    public static class TokenizerMapper
       extends Mapper{

  }

    // 自定义的 IntSumReducer 类将继承自 Reducer 类,以实现相关的接口和方法
    // 在 Reduce 阶段将会执行其中的作业逻辑
    public static class IntSumReducer
       extends Reducer {

  }

    // main 方法是整个程序的入口,在这里涉及到作业(Job)的各项设置
    public static void main(String[] args) throws Exception {

  }
}



编写 Mapper

map 方法是 Mapper 的具体实现。在下面这段代码中,Mappter 通过 map 方法一次只处理读入的一行数据。数据是由指定的 TextInputFormat 提供的。然后,它通过 StringTokenizer 将每一行的内容分割为由空格分隔的单词,形成    形式的键值对。即每个单词作为键,对应一个计数的值。

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
    }
}

举个例子,如果我们在文件中读入了一行数据:

AAA BBB uiuing AAA

那么在 map 方法的处理后,将形成如下所示的键值对:





理解了 map 的工作机制之后,现在我们来完善 Mapper 类的代码。

请在  /home/hadoop/WordCount.java  文件中将 TokenizerMapper 的代码按照如下的样例补充完整。相关的讲解将以注释给出。

  public static class TokenizerMapper extends Mapper{

    // 在 MapReduce 框架中,基本数据类型都封装成了 Writable 类型
    // 因此 int 类型对应于 IntWritable 类型,在初始化时将其声明为静态常量是为了方便地使用 1 的值
    private final static IntWritable one = new IntWritable(1);

    // 声明一个 Text 类型的私有成员变量 word
    private Text word = new Text();

    // map 方法的写法是标准格式,可以参考官方文档理解各个参数的含义
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      // 从 value 中读入数据并按照空格分隔
      StringTokenizer itr = new StringTokenizer(value.toString());

      // 将每个分隔形成的单词组装成键值对
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }



编写 Reduce

reduce 方法是 Reducer 的具体实现。在 WordCount 的例子中,通过如下所示的 reduce 方法可以实现汇总键值对中的值,从而达到统计每个键出现次数的目的。这也是词频统计的核心所在。

public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }

上一步 map 阶段我们得到了如下所示的键值对:





那么在 reduce 阶段,这些键值对中键相同的值将会被累加,聚合成如下所示的键值对:




理解了以上逻辑之后,请在  /home/hadoop/WordCount.java  文件中将 IntSumReducer 类的代码补充完整:

public static class IntSumReducer extends Reducer {
    // 声明一个 IntWritable 类型值用于存放累加结果
    private IntWritable result = new IntWritable();

    // reduce 方法的写法也是参考官方文档进行的,相关的参数可以查阅官方文档进行理解
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
      // 进行值的累加操作
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      // 将 int 基本类型通过 set 方法赋予到结果中
      result.set(sum);
      // 写入上下文中进行保存
      context.write(key, result);
    }
  }



编写任务驱动

我们知道任何编程框架都会有一个主入口,就像 Java 程序以 main 方法作为主入口一样。程序执行的第一步就是执行 main 方法中的逻辑。

因此,我们将 main 方法理解为任务驱动,即设置和发起任务的角色。

此外,在 WordCount 中,我们还可以复用 Reducer 的实现逻辑,并且指定一个 Combiner (组合器)。因此,在对键进行排序之后,每个 map 的输出都通过本地的 Combiner (根据作业配置与还原器相同)传递给本地的聚合过程,这样做可以减少后续 reduce 过程跨节点的数据传输量。

请在  /home/hadoop/WordCount.java  文件中将 main 方法的逻辑代码补充完整。相关的介绍将以注释形式给出。

// throws Exception 可以合理地抛出 MapReduce 任务中产生的异常,便于我们进行调试
public static void main(String[] args) throws Exception {

    // 程序的第一步是声明并初始化 Configuration 对象用于设置作业的相关运行参数
    Configuration conf = new Configuration();

    // 设置作业的配置参数和名称
    Job job = Job.getInstance(conf, "word count");
    // 将 WordCount 类作为运行的入口
    job.setJarByClass(WordCount.class);

    // 通过 setMapperClass 方法告诉集群应当在 map 阶段执行哪些逻辑
    job.setMapperClass(TokenizerMapper.class);
    // 通过 setCombinerClass 方法告诉集群应当在 combine 阶段执行哪些逻辑,此处复用了 Reducer 的逻辑,用于在本地进行部分结果的累加
    // 这个步骤不是必须的
    job.setCombinerClass(IntSumReducer.class);
    // 通过 setReducerClass 方法告诉集群应当在 combine 阶段执行哪些逻辑
    job.setReducerClass(IntSumReducer.class);
    // 设置输出结果中键的数据类型
    job.setOutputKeyClass(Text.class);
    // 设置输出结果中值的数据类型
    job.setOutputValueClass(IntWritable.class);
    // 利用 main 函数的第 1 个输入参数获取输入数据的路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    // 利用 main 函数的第 2 个输入参数获取输出数据的路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // job.waitForCompletion(true) 相当于开启执行任务的开关,执行到此处时一个 MapReduce 应用才会真正地开始计算
    // 使用 System.exit 方法来告知程序运行的状态
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }



MapReduce 作业的编译和执行

编译 Jar 包

完成所有代码的编写之后,我们需要将  WordCount.java  文件编译为 Jar 包。

在终端中输入以下命令来完成编译:

cd ~
# 导入生效
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
# 编译
hadoop com.sun.tools.javac.Main /home/hadoop/WordCount.java
# 打包
jar cf wc.jar WordCount*.class

执行作业

Jar 包编译完成之后,就可以开始执行作业了。使用  hadoop jar  命令,并附带上 jar 包的路径、程序主类名称和输入输出文件的路径。

请在终端中输入以下命令来提交作业执行。

hadoop jar /home/hadoop/wc.jar WordCount /user/hadoop/protocols /user/hadoop/wordcount_output

观察作业进度

在 MapReduce 的工作过程中,我们可以在终端中的输出看到作业的进度:

如果看到  map 100% reduce 100%  的提示之后,则表明计算已经完成,在输出的提示中还包含了 Counter 、 MapReduce 框架以及 Shuffle 的一些统计信息,这些信息可以帮助我们分析作业的执行质量,也可以定位问题。

查看结果

最后,我们来查看计算的结果。

在终端中输入以下命令来查看计算结果:

hdfs dfs -cat /user/hadoop/wordcount_output/part-r-*

可以看到当前文件中出现最多的单词(字符)是  #

作为改进,你可以在框架实现过程中通过正则表达式来过滤掉非单词的字符。



学习 MapReduce 方法

尽管前文较为详细地介绍了 MapReduce 的工作原理,并且以 WordCount 为例实现了一个 MapReduce 应用程序,但对初学者而言,如何将这样的编程思想应用在解决实际问题之中呢?

先不要忙着上手写自己的代码,学习一个编程框架最好的途径就是熟读文档和样例代码。

目前 Hadoop 在不同版本的官方文档(Documentation)中均提供了 MapReduce 框架的介绍,

cd ~
wget https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1-src.tar.gz
tar -zxvf hadoop-2.6.1-src.tar.gz
tree hadoop-2.6.1-src/hadoop-mapreduce-project/hadoop-mapreduce-examples/

这其中既有我们学习过的 WordCount 案例,也还有一些其他的示例程序。下面选择其中一些进行介绍:

  • AggregateWordCount:基于聚合的 MapReduce 程序,用于对输入文件中的单词进行计数。
  • AggregateWordHistogram:基于聚合的计算输入文件中单词的直方图的 MapReduce 程序。
  • BaileyBorweinPlouffe:使用 Bailey-Borwein-Plouffe 算法来计算精确的圆周率数字的 MapReduce 程序。
  • Grep:在输入中用于统计符合正则表达式匹配数量的 MapReduce 程序。
  • Join:对已排序并处于相同分区的数据集进行连接的 MapReduce 程序。
  • MultiFileWordCount:从多个文件中计算单词词频的 MapReduce 程序。
  • Pentomino:模拟瓷砖铺设的 MapReduce 程序,可用来寻找解决方法。
  • Pi:使用拟蒙特卡罗方法估计圆周率的 MapReduce 程序。
  • RandomTextWriter:该程序可向每个节点写入 10GB 的随机文本数据。
  • RandomWriter:该程序可向每个节点写入 10GB 的随机数据。
  • SecondarySort:一个二级排序的示例。
  • Sort:对数据进行排序的示例。
  • Sudoku:求解数独问题的程序。
  • TeraGen:为 Terasort 生成数据。
  • TeraSort:运行 Terasort 测试。这是一个非常出名的测试,用于验证分布式计算框架对 1TB 数据排序的速度,你可以访问  Sort Benchmark  了解更多信息。
  • TeraValidate:查看 Terasort 的结果。
  • WordMean:用于计算输入文件中单词的平均长度。
  • WordMedian:计算输入文件中单词的中位数长度的 MapReduce 程序。
  • WordStandardDeviation:一个 MapReduce 程序,它计算输入文件中单词长度的标准偏差。

在以上示例代码文件中,每一个代码文件都详尽地注释了其工作原理。通过阅读计算框架或示例程序的源代码,将能够在短时间内迅速加深理解。你可以选择以上任意感兴趣的源代码进行阅读,此处不再详细展开。