Hadoop: 单词计数(Word Count)的MapReduce实现
1.Map与Reduce过程
1.1 Map过程
首先,Hadoop会把输入数据划分成等长的输入分片(input split) 或分片发送到MapReduce。Hadoop为每个分片创建一个map任务,由它来运行用户自定义的map函数以分析每个分片中的记录。在我们的单词计数例子中,输入是多个文件,一般一个文件对应一个分片,如果文件太大则会划分为多个分片。map函数的输入以
形式做为输入,value
为文件的每一行,key
为该行在文件中的偏移量(一般我们会忽视)。这里map函数起到的作用为将每一行进行分词为多个word
,并在context
中写入
以代表该单词出现一次。
map过程的示意图如下:
data:image/s3,"s3://crabby-images/5f8ff/5f8ffd2af4c98b317178ecb85495da4e151ea668" alt=""
mapper代码编写如下:
public static class TokenizerMapper
extends Mapper
如果我们能够并行处理分片(不一定是完全并行),且分片是小块的数据,那么处理过程将会有一个好的负载平衡。但是如果分片太小,那么管理分片与map任务创建将会耗费太多时间。对于大多数作业,理想分片大小为一个HDFS块的大小,默认是64MB。
map任务的执行节点和输入数据的存储节点相同时,Hadoop的性能能达到最佳,这就是计算机系统中所谓的data locality optimization(数据局部性优化)。而最佳分片大小与块大小相同的原因就在于,它能够保证一个分片存储在单个节点上,再大就不能了。
1.2 Reduce过程
接下来我们看reducer的编写。reduce任务的多少并不是由输入大小来决定,而是需要人工单独指定的(默认为1个)。和上面map不同的是,reduce任务不再具有本地读取的优势————一个reduce任务的输入往往来自于所有mapper的输出,因此map和reduce之间的数据流被称为 shuffle(洗牌) 。Hadoop会先按照key-value对进行排序,然后将排序好的map的输出通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。
reduce 函数示意图如下:
reducer代码编写如下:
public static class IntSumReducer
extends Reducer{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
2.完整代码
2.1 项目架构
关于VSCode+Java+Maven+Hadoop开发环境搭建,可以参见我的博客,此处不再赘述。这里展示我们的项目架构图:
Word-Count-Hadoop
├─ input
│ ├─ file1
│ ├─ file2
│ └─ file3
├─ output
├─ pom.xml
├─ src
│ └─ main
│ └─ java
│ └─ WordCount.java
└─ target
WordCount.java
代码如下:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount{
public static class TokenizerMapper
extends Mapper
pom.xml
中记得配置Hadoop的依赖环境:
...
UTF-8
17
17
3.3.1
junit
junit
4.11
test
org.apache.hadoop
hadoop-common
${hadoop.version}
org.apache.hadoop
hadoop-hdfs
${hadoop.version}
org.apache.hadoop
hadoop-mapreduce-client-core
${hadoop.version}
org.apache.hadoop
hadoop-client
${hadoop.version}
org.apache.hadoop
hadoop-yarn-api
${hadoop.version}
...
此外,因为我们的程序自带输入参数,我们还需要在VSCode的launch.json
中配置输入参数intput
(代表输入目录)和output
(代表输出目录):
...
"args": [
"input",
"output"
],
...
编译运行完毕后,可以查看output
文件夹下的part-r-00000
文件:
David 1
Goodbye 1
Hello 3
Tom 1
World 2
可见我们的程序正确地完成了单词计数的功能。
参考
- [1] Hadoop官方文档:MapReduce Tutorial
- [2] White T. Hadoop: The definitive guide[M]. " O'Reilly Media, Inc.", 2012.
- [3] Stack Overflow: What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming?