Spark基础01


1.Spark

Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘

Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互。

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(conf)
    val wordCount: RDD[(String, Int)] = sc.textFile("./word.txt")
    .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    val array: Array[(String, Int)] = wordCount.collect()

    // foreach方法其实是Scala集合(单点)的方法
    array.foreach(println)
    sc.stop()
  }

1.1运行架构

image-20211116175057662

Driver表示master,负责管理整个集群中的作业任务调度。

图形中的Executor 则是 slave,负责实际执行任务。需要缓存的RDD 是直接缓存在Executor进程

Master是一个进程,主要负责资源的调度和分配

Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算

ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container

Spark 为第三代计算引擎,特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。

1.2提交流程

Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置。

Client:Driver模块在客户端执行

  1. Driver运行在本地
  2. Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
  3. ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster
  4. ApplicationMaster向ResourceManager申请资源启动Executor进程
  5. Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
  6. 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

Cluster:Driver模块启动在Yarn集群资源中执行

2.核心编程

三大数据结构分别是:

? RDD : 弹性分布式数据集

? 累加器:分布式共享只写变量

? 广播变量:分布式共享只读变量

2.1RDD

2.1.1概念

RDD(Resilient Distributed Dataset):弹性分布数据集

  • 弹性
    1. 存储的弹性:内存与磁盘的自动切换;
    2. 容错的弹性:数据丢失可以自动恢复;
    3. 计算的弹性:计算出错重试机制;
    4. 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD封装了计算逻辑,并不保存数据

核心属性 :RDD是可分区的;使用分区函数对每一个分区进行计算;RDD之间有依赖关系

执行原理:RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算

  1. Spark框架在执行时,先申请资源
  2. 根据数据处理逻辑对任务拆分
  3. 将任务发到已经分配资源的计算节点上

2.1.2基础编程

能够并行计算的任务数量我们称之为并行度

mapPartitions:以分区为单位发送到计算节点进行处理

val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
    datas => {
        datas.filter(_==2)
    }
)

map和mapPartitions的区别?

  1. Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。

  2. Map性能低,mapPartitions性能高,但是可能导致内存不够用

repartition:该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。

无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

val dataRDD = sparkContext.makeRDD(List(
    1,2,3,4,1,2
),2)

val dataRDD1 = dataRDD.repartition(4)

zip:将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)

partitionBy:将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))

reduceByKey和groupByKey的区别?

  1. reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合

  2. reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。

序列化

算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误。

依赖关系

Lineage会记录元数据信息,当RDD部分分区数据丢失时,根据依赖信息重新计算。

窄依赖:一个父RDD的Partion,最多被子RDD的一个Partition使用

宽依赖:父RDD的Partition,被多个子RDD的Partition依赖

任务划分

一个Action算子就会生成一个DAG;

一个DAG就会对应一个Job;

一个Job包含一到多个Stage;

一个Stage对应一个TaskSet;

一个Stage中Task的数量取决于Stage中最后一个RDD分区的数量

持久化

Cache和Checkpoint

  • 缓存和检查点区别
  1. Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
  2. Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS,可靠性高。
  3. 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

2.1.3累加器

累加器用来把Executor端变量信息聚合到Driver端。分布式的改变,然后聚合这些改变

    import org.apache.spark.{SparkConf, SparkContext}

    object Demo3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("demo2").setMaster("local[*]")
        val sc = new SparkContext(conf)

        val lst = List(1, 2, 3, 4, 5, 6, 7, 8)
        var sum = 0
        lst.foreach(x => sum += x)
        println("sum = 36" + sum)
        println("---------------")

        var sum_lst = 0
        val rdd01 = sc.makeRDD(lst)
        rdd01.foreach(x => sum_lst += x)
        println("sum_lst = 0" + sum_lst) //值没有变化
        println("---------------")

        //定义一个累加器
        val accumulator = sc.longAccumulator
        //累加器操作
        rdd01.foreach(x => accumulator.add(x))
        //得到累加器的值
        println("accumulator.value = 36" + accumulator.value)
      }
    }

2.1.4广播变量

广播变量用来高效分发较大的对象

如果这个变量不是广播变量,那么executor中每个task就会分发一份

如果将这个变量声明为广播变量,那么只是每个executor拥有一份,这个executor启动的task会共享这个变量

    如何定义一个广播变量
    val a = 3
    val broadcast = sc.broadcast(a)
    如何使用一个广播变量
    val c = broadcast.value

变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

注意事项
1、能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
2、 广播变量只能在Driver端定义,不能在Executor端定义。
3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

import org.apache.spark.{SparkConf, SparkContext}

object Demo3 {
    def main(args: Array[String]): Unit = {
        val conf = {
            new SparkConf().setAppName("demo2").setMaster("local[*]")
        }
        val sc = new SparkContext(conf)
        val a = 3
        //定义一个广播变量
        val broadcast = sc.broadcast(a)
        //得到一个广播变量
        println(broadcast.value)
        println("---------------")
        
        //需求:统计一个文件中单词的个数,不统计黑名单中的单词
        //黑名单 "hello", "word", "abc", "!", ",", ".", "。"
        val lst = List("hello", "word", "abc", "!", ",", ".", "。")
        val broadcast_list = sc.broadcast(lst)
        //    hello.txt内容如下:
        //    hello,hive
        //    hello,hadoop
        val baseDataRDD = sc.textFile("file:///F:/wc/hello.txt").flatMap(_.split(","))
        baseDataRDD.foreach(println)
        println("---------------")
        baseDataRDD.filter(word => !broadcast_list.value.contains(word))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
        .foreach(println)
    }
}