第五章_Spark核心编程_Rdd并行度 与切片
1. 什么是Spark的并行度 、什么是Rdd的分区?
1. 什么是Spark的并行度 ? Driver 将任务进行切分成不同的Task, 再发送给 Executor 节点并行计算,并行计算的任务数量 我们称之为 并行度 2. 什么是Rdd的分区 ? 1. 将要操作的数据分成 若干份,以便 分布式计算
2. 看示例
package pak_partition { import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD //1. ParallelCollectionRDD 分区策略 object MemoryBypartion extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("MemoryBypartion") // 设置 分区个数 sparkconf.set("spark.default.parallelism", "2") val sc: SparkContext = new SparkContext(sparkconf) /* * 定义 : * def parallelize[T](seq: Seq[T], numSlices: Int)(evidence$1: ClassTag[T]): RDD[T] * 参数 : * numSlices : 指定切片数量 * 不指定时, 使用默认设置值 defaultParallelism * override def defaultParallelism(): Int = * scheduler.conf.getInt("spark.default.parallelism", totalCores) * */ val list_rdd: RDD[Int] = sc.parallelize( List(1, 2, 3, 4) ) // 查看分区数 println(s"当前分区数: ${list_rdd.getNumPartitions}") list_rdd.saveAsTextFile("Spark_319/src/output/01") //关闭资源 sc.stop() } //2. File Rdd object FileBypartion extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("initRddByLocalFile") //sparkconf.set("spark.default.parallelism", "13") val sc: SparkContext = new SparkContext(sparkconf) // private val str: String = sparkconf.get("spark.default.parallelism") // println(s"默认分区数:${str}") /* * 定义 : * def textFile(path: String, minPartitions: Int): RDD[String] * 参数 : * minPartitions : 不指定时,使用默认参数(使用默认参数时,切片个数最大为2) * def defaultMinPartitions: Int = math.min(defaultParallelism, 2) * */ private val text_rdd: RDD[String] = sc.textFile("Spark_319/src/data", 5) // 查看分区数 println(s"当前分区数: ${text_rdd.getNumPartitions}") text_rdd.saveAsTextFile("Spark_319/src/output/02") //关闭资源 sc.stop() } }
3. 看源码
1. ParallelCollectionRDD
创建 ParallelCollectionRDD -- 创建rdd def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } -- 读取 分区个数的默认值 -- 通过 sparkconf.set("spark.default.parallelism","2") 来设置 override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) -- 切片规则 -- 根据 集合索引 和 切片个数 来进行切片 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } }
2. file rdd
-- 创建rdd -- 不指定 minPartitions时,使用默认值 def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } -- 获取 切片个数默认值 override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) -- 使用默认参数时,切片个数 最大为2 def defaultMinPartitions: Int = math.min(defaultParallelism, 2) -- 切片规则 见下文 org.apache.hadoop.mapred.FileInputFormat public InputSplit[] getSplits(JobConf job, int numSplits)
4. 记结论
1. ParallelCollectionRDD 1. 切片个数 Rdd 创建时 可以指定切片个数,不指定时,将使用 默认值 note : 默认切片个数 不会超过2 2. 切片规则 根据 集合索引 和 切片个数,将数据切片 示例 : List(1,2,3,4) 分区数=2 切分成 1:(1,2) 2:(3,4) 2. File Rdd 1. 切片个数 Rdd 创建时 可以指定切片个数,不指定时,将使用 默认值 note : 默认切片个数 不会超过2 这里指定的是 最小切片数(minPartitions) 也就是说 创建的Rdd真实切片个数必定 >= minPartitions 2. 切片规则 根据 文件大小 和 切片大小,将数据切片
5. org.apache.hadoop.mapred.FileInputFormat 之 getSplits
// 将job输入生成一个文件列表 // 并将文件切分成 InputSplit对象,并将 切片对象 存储到list中 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { StopWatch sw = new StopWatch().start(); //1. 通过job 读取input目录,将路径中 文件作为元素 FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen job.setLong(NUM_INPUT_FILES, files.length); // 计算 所有文件的长度 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } // 计算 目标切片大小 = 所有文件总和 / 指定切片数 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); // 计算 最小切片大小 // SPLIT_MINSIZE = mapreduce.input.fileinputformat.split.minsize // private long minSplitSize = 1; // 不设置 SPLIT_MINSIZE时,默认为1 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // 创建 List<InputSplit>,用来存储 切片结果 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 遍历文件列表,对每个文件进行切片 for (FileStatus file: files) { // 1. 获取文件路径 Path path = file.getPath(); // 2. 获取文件长度 long length = file.getLen(); // 处理非空文件 if (length != 0) { FileSystem fs = path.getFileSystem(job); // 3. 获取文件 block信息 BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } // 4. 判断文件 是否可以 切片 if (isSplitable(fs, path)) { // 文件支持 切片 // 5. 获取文件块的大小,本地运行环境 32M long blockSize = file.getBlockSize(); // 6. 获取切片大小 long splitSize = computeSplitSize(goalSize, minSize, blockSize); //计算规则 protected long computeSplitSize(long goalSize, long minSize,long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); } // 7. 设置临时变量 存储 剩余文件长度 long bytesRemaining = length; // 8. 对文件 循环切片 // SPLIT_SLOP = 1.1; // 10% slop 允许切片大小有 10%的溢出 // 这样做的好处是 : 能保证 一个文件的某个切片 为切片大小的1.1倍,避免了 过小切片的出现 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { // String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); //10. 初始化切片对象,添加到 List<InputSplit> // path : 文件路径 // length-bytesRemaining : 切片在文件中 起始位置 // splitSize : 切片大小 // splitHosts : 切片在集群上的位置信息 splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); // 剩余文件长度 减去一个 切片长度 bytesRemaining -= splitSize; } //11. 当文件剩余长度 小于切片长度*1.1 时,将文件剩余部分 作为一个切片 if (bytesRemaining != 0) { // 获取切片对象属性,并往 List<InputSplit> 添加一个切片 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { // 当文件不可切片时,将整个文件 作为一个切片进行存储 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //存储空文件 splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } // 6. 返回 存储切片对象的 集合 return splits.toArray(new FileSplit[splits.size()]); }
总结说明 :
总结说明 : org.apache.hadoop.mapred.FileInputFormat public InputSplit[] getSplits(JobConf job, int numSplits) 1. numSplits = 指定切片个数 也就是 创建Rdd时,指定的最小切片数(minPartitions) 创建的Rdd真实切片个数必定 >= minPartitions 2. 计算 目标切片大小 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits) goalSize = sc.textFile() 读取的文件总和 / minPartitions 目标切片大小 = 所有文件总和 / 指定切片数 3. 计算 切片的最小值 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); SPLIT_MINSIZE = mapreduce.input.fileinputformat.split.minsize 不设置时,默认为1 4. 计算 切片的真实大小 long splitSize = Math.max(minSize, Math.min(goalSize, blockSize)); // 能保证切片大小 不大于 blockSize(数据本地化) // 设置 minSize, 可以将 切片大小调整 大于blockSize 5. 遍历文件列表,对列表内文件 循环切片(每个文件单独切片) for (FileStatus file: files) note : while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) SPLIT_SLOP = 1.1 // 允许实际切片大小 为 切片的1.1倍 // 这样做的好处是 避免了 过小切片的出现 // 示例 : 1.txt = 93M 切片大小为 30M // split1 = 30M // split2 = 30M // split3 = 33M (33M/30M=1.1) 6. 返回 切片对象的数组 splits.toArray(new FileSplit[splits.size()]) // 成功将 输入的文件,切分成了大小相等的切片