【Spark学习笔记】广播变量和累加器
一、广播变量(调优操作)
使用广播变量是个调优操作,不使用广播变量可能会造成Executor端内存溢出。
1.普通变量定义
val rdd1: RDD[String] = sc.parallelize(Array[String]("Java", "C", "Python", "Hadoop", "Spark"),2)
// driver每发送一个task到executor,就将此变量发送给executor一次,占用executor内存
val blackList = List("C", "Python")
val rdd2: RDD[String] = rdd1.filter(!blackList.contains(_))
rdd2.foreach(println)
RDD算子部分是在Executor端执行,其他的在Driver端执行。
blackList端创建的,但是因为需要在Executor端使用,所以Driver会把blackList以task的形式发送到Executor端。如果有很多个task,Executor端就会有很多个blackList,可能会造成Executor内存溢出。
不使用广播变量:
使用广播变量:
2.广播变量定义
val sc = new SparkContext(conf)
val rdd1: RDD[String] = sc.parallelize(Array[String]("Java", "C", "Python", "Hadoop", "Spark"),2)
// driver每发送一个task到executor,就将此变量发送给executor一次,占用executor内存
val blackList = List("C", "Python")
// 使用广播变量,driver将只发送一次到executor,在executor上管理
val broadCast = sc.broadcast(blackList)
val rdd2: RDD[String] = rdd1.filter(!broadCast.value.contains(_))
rdd2.foreach(println)
注意:广播变量在Driver端定义赋初值,在Driver端可以修改广播变量的值,在Executor端无法修改。
二、累加器(正确操作)
累加器不是调优操作,而是正确操作,如果不这么做,就是错的。
1.普通累加器
val rdd1: RDD[String] = sc.parallelize(Array[String]("Java", "C", "Python", "Hadoop", "Spark"),2)
// 结果为0
var sum = 0
// 必须得使用累加器
val accumulator = sc.longAccumulator
rdd1.foreach(_ => {
  sum += 1
  println(s"Executor -- i = $sum")
  accumulator.add(1)
  println(s"Executor -- accumulator = $accumulator")
})
println(s"sum:$sum")
println(s"accumulator:${accumulator.value}")
注意:累加器在Driver端定义赋初值,在Executor端更新。新版的Spark在Executor端也可以读取累加器的值
2.自定义累加器
自定义实体类,封装多个属性,实现累加
case class PersonInfo(var personCount: Int, var ageCount: Int)
自定义累加器需要继承AccumulatorV2
class MyAcc extends AccumulatorV2[PersonInfo, PersonInfo] {
  private var resultInfo = new PersonInfo(0, 0)
  /** 判断reset 是否是初始值,一定和reset保持一致 */
  override def isZero: Boolean = {
    resultInfo.personCount == 0 && resultInfo.ageCount == 0
  }
  override def copy(): AccumulatorV2[PersonInfo, PersonInfo] = {
    val myAcc = new MyAcc
    myAcc.resultInfo = this.resultInfo
    myAcc
  }
  override def reset(): Unit = {
    resultInfo = new PersonInfo(0, 0)
  }
  /** 作用在Executor端 */
  override def add(v: PersonInfo): Unit = {
    resultInfo.personCount += v.personCount
    resultInfo.ageCount += v.ageCount
  }
  /** Driver端的result与Executor中的每个result进行聚合 */
  override def merge(other: AccumulatorV2[PersonInfo, PersonInfo]): Unit = {
    resultInfo.personCount += other.asInstanceOf[MyAcc].resultInfo.personCount
    resultInfo.ageCount += other.asInstanceOf[MyAcc].resultInfo.ageCount
  }
  override def value: PersonInfo = resultInfo
}
使用自定义累加器
def main(args: Array[String]): Unit = {
  val conf = new SparkConf()
  conf.setMaster("local")
  conf.setAppName("自定义累加器")
  val sc = new SparkContext(conf)
  sc.setLogLevel("error")
  // 自定义累加器
  val myAcc = new MyAcc
  sc.register(myAcc)
  val rdd1: RDD[String] = sc.parallelize(Array[String](
    "张三 18", "李四 20", "王五 21", "马六 16"
  ), 3)
  val value: RDD[Unit] = rdd1.map(elem => {
    val age = elem.split(" ")(1).toInt
    myAcc.add(new PersonInfo(1, age))
  })
  // action算子触发
  value.count()
  println("accumlator value = " + myAcc.value)
}