Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)
1 导引
我们在博客中学习了如何用Hadoop-MapReduce实现单词计数,现在我们来看如何用Spark来实现同样的功能。
2. Spark的MapReudce原理
Spark框架也是MapReduce-like模型,采用“分治-聚合”策略来对数据分布进行分布并行处理。不过该框架相比Hadoop-MapReduce,具有以下两个特点:
-
对大数据处理框架的输入/输出,中间数据进行建模,将这些数据抽象为统一的数据结构命名为弹性分布式数据集(Resilient Distributed Dataset),并在此数据结构上构建了一系列通用的数据操作,使得用户可以简单地实现复杂的数据处理流程。
-
采用了基于内存的数据聚合、数据缓存等机制来加速应用执行尤其适用于迭代和交互式应用。
Spark社区推荐用户使用Dataset、DataFrame等面向结构化数据的高层API(Structured API)来替代底层的RDD API,因为这些高层API含有更多的数据类型信息(Schema),支持SQL操作,并且可以利用经过高度优化的Spark SQL引擎来执行。不过,由于RDD API更基础,更适合用来展示基本概念和原理,后面我们的代码都使用RDD API。
Spark的RDD/dataset分为多个分区。RDD/Dataset的每一个分区都映射一个或多个数据文件, Spark通过该映射读取数据输入到RDD/dataset中。
Spark的分区数和以下参数都有关系:
-
spark.default.parallelism
(默认为CPU的核数) -
spark.sql.files.maxPartitionBytes
(默认为128 MB)读取文件时打包到单个分区中的最大字节数) -
spark.sql.files.openCostInBytes
(默认为4 MB) 该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并。
我们下面的流程描述中,假设每个文件对应一个分区(实际上因为文件很小,导致三个文件都在同一个分区中,大家可以通过调用RDD
对象的getNumPartitions()
查看)。
Spark的Map示意图如下:
Spark的Reduce示意图如下:
3. Word Count的Java实现
项目架构如下图:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ pom.xml
├─ src
│ ├─ main
│ │ └─ java
│ │ └─ WordCount.java
│ └─ test
└─ target
WordCount.java
文件如下:
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;
public class WordCount {
private static Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount
pom.xml
文件配置如下:
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
com.WordCount
WordCount
1.0-SNAPSHOT
WordCount
http://www.example.com
2.12.10
2.12
UTF-8
UTF-8
UTC
11
1.4.0
3.7.1
3.1.2
2.0.0
4.4.0
3.8.0
3.2.0
3.2.1
2.8.2
1.6.8
3.2.0
1.6
2.22.2
UTF-8
11
11
3.2.1
junit
junit
4.11
test
org.scala-lang
scala-library
${scala.version}
provided
org.apache.spark
spark-core_2.12
${spark.version}
org.apache.spark
spark-sql_2.12
${spark.version}
provided
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.8.0
maven-surefire-plugin
2.22.1
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
maven-compiler-plugin
3.8.0
11
true
/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac
记得配置输入参数input
和output
代表输入目录和输出目录(在VSCode中在launch.json
文件中配置)。编译运行后可在output
目录下查看result.txt
:
Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1
可见成功完成了单词计数功能。
4. Word Count的Python实现
先使用pip按照pyspark==3.8.2
:
pip install pyspark==3.8.2
注意PySpark只支持Java 8/11,请勿使用更高级的版本。这里我使用的是Java 11。运行java -version
可查看本机Java版本。
(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)
项目架构如下:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ src
│ └─ word_count.py
word_count.py
编写如下:
from pyspark.sql import SparkSession
import sys
import os
from operator import add
if len(sys.argv) != 3:
print("Usage: WordCount
使用python word_count.py input output
运行后,可在output
中查看对应的输出文件result.txt
:
Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1
可见成功完成了单词计数功能。
参考
- [1] Spark官方文档: Quick Start
- [2] 许利杰,方亚芬. 大数据处理框架Apache Spark设计与实现[M]. 电子工业出版社, 2021.
- [3] GiHub: Spark官方Java样例
- [4]