Spark常用算子


Spark常用算子

  
        org.apache.spark
        spark-core_2.12
        3.0.0
    

1. sample, union, intersection, subtract

    import org.apache.spark.{SparkConf, SparkContext}

    object Demo3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("demo1").setMaster("local")
        val sc = new SparkContext(conf)
        val arr = Array(1, 2, 3, 4, 5, 6, 7, 7, 8, 9, 10)
        val rdd01 = sc.makeRDD(arr)
        val rdd02 = rdd01.map(x => x * 2)
        println("rdd01 = " + rdd01.collect().toBuffer)
        println("rdd02 = " + rdd02.collect().toBuffer)

        val rdd03 = rdd01.filter(x => x % 2 == 0)
        println("rdd03 = " + rdd03.collect().toBuffer)
        /* -------------------------
         rdd01 = ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 7, 8, 9, 10)
         rdd02 = ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 14, 16, 18, 20)
         rdd03 = ArrayBuffer(2, 4, 6, 8, 10)
         -------------------------*/
        val arr01 = Array("hello wr", "hello hu", "hello zh")
        val rdd04 = sc.makeRDD(arr01)
        //split将每一行的数据按照空格切分返回一个数组,这个数组是RDD中的元素
        val rdd05 = rdd04.map(x => x.split(" "))
        println("rdd05 = " + rdd05.collect().toBuffer)
        rdd05.foreach(x => println(x.toBuffer))

        /* -------------------------
          rdd05 = ArrayBuffer([Ljava.lang.String;@72e789cb, [Ljava.lang.String;@7c1812b3, [Ljava.lang.String;@43034809)
          ArrayBuffer(hello, wr)
          ArrayBuffer(hello, hu)
          ArrayBuffer(hello, zh)
         -------------------------*/

        //split将每一行的数据按照空格切分返回一个数组,这个数组中的每一个元素是RDD中的元素
        val rdd06 = rdd04.flatMap(x => x.split(" "))
        println("rdd06 = " + rdd06.collect().toBuffer)
        rdd06.foreach(x => println(x.toBuffer))
        /* ---------------------------
          rdd06 = ArrayBuffer(hello, wr, hello, hu, hello, zh)
          ArrayBuffer(h, e, l, l, o)
          ArrayBuffer(w, r)
          ArrayBuffer(h, e, l, l, o)
          ArrayBuffer(h, u)
          ArrayBuffer(h, e, l, l, o)
          ArrayBuffer(z, h)
         ----------------------------*/

        /* 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
         ?	withReplacement : Boolean , True表示进行替换采样,False表示进行非替换采样
         表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
         ?	fraction : Double, 在0~1之间的一个浮点值,表示要采样的记录在全体记录中的比例
         ?	seed :随机种子*/
        val rdd07 = rdd01.sample(true, 0.2, 4) //使用的频率比较低
        println("rdd07 = " + rdd07.collect().toBuffer)
        //    dd07 = ArrayBuffer(5, 9, 9)

        val lst = List(2, 3, 12, 13)
        val rdd08 = sc.makeRDD(lst)
        //val arr = Array(1,2,3,4,5,6,7,7,8,9,10)
        //val rdd01 = sc.makeRDD(arr)
        //并集
        val rdd09 = rdd01.union(rdd08)
        println("rdd09 = " + rdd09.collect().toBuffer)
        //      rdd09 = ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 7, 8, 9, 10, 2, 3, 12, 13)
        //交集
        val rdd10 = rdd01.intersection(rdd08)
        println("rdd10 = " + rdd10.collect().toBuffer)
        //    rdd10 = ArrayBuffer(3, 2)
        //差集
        val rdd11 = rdd01.subtract(rdd08)
        println("rdd11 = " + rdd11.collect().toBuffer)
        //    rdd11 = ArrayBuffer(1, 4, 5, 6, 7, 7, 8, 9, 10)
        val rdd12 = rdd08.subtract(rdd01)
        println("rdd12 = " + rdd12.collect().toBuffer)
        //    rdd12 = ArrayBuffer(12, 13)
        val arr02 = Array(1, 2, 3, 4, 2, 3)
        val rdd13 = sc.makeRDD(arr02)
        //去重
        val rdd14 = rdd13.distinct()
        println("rdd14 = " + rdd14.collect().toBuffer)
        //    d14 = ArrayBuffer(4, 1, 3, 2)
      }
    }

2. groupByKey, reduceByKey, sortByKey, sortBy

    import org.apache.spark.{SparkConf, SparkContext}

    object Demo3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("demo1").setMaster("local")
        val sc = new SparkContext(conf)
        val arr = Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("d", 5), ("b", 7))
        val rdd01 = sc.parallelize(arr)

        //将相同key的value放到Iterator中
        val rdd02 = rdd01.groupByKey()
        println("rdd02 = " + rdd02.collect().toBuffer) //(d,CompactBuffer(5)), (a,CompactBuffer(1, 4))

        //将相同key的值聚合到一起
        val rdd03 = rdd01.reduceByKey(_ + _) //统计单词个数 词频统计
        println("rdd03 = " + rdd03.collect().toBuffer) //(d,5), (a,5), (b,9), (c,3)

        //在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
        val rdd04 = rdd01.sortByKey()
        println("rdd04 = " + rdd04.collect().toBuffer) //(a,1), (a,4), (b,2), (b,7), (c,3), (d,5)
        //与sortByKey类似,但是更灵活 第一个参数是根据什么排序  第二个是怎么排序 false倒序
        //第三个排序后分区数  默认与原RDD一样
        val rdd05 = rdd01.sortBy(_._1, false)
        println("rdd05 = " + rdd05.collect().toBuffer) //(d,5), (c,3), (b,2), (b,7), (a,1), (a,4)

        //姓名 语文 数学 英语
        val arr02 = Array(("tom", 99, 98, 97), ("lieli", 99, 88, 77), ("jerry", 99, 77, 66), 
                          ("hanmeimei", 88, 99, 88), ("xiaoming", 98, 97, 96), ("xiaohong", 100, 100, 100))
        val rdd06 = sc.makeRDD(arr02)
        //先按语文排序 ,如果语文一样,再数学排序,
        //语文 数学  spark写代码的时候 sortBy(数学).sortBy(语文)  mysql中order by 语文,数学
        //sortBy只有两个参数
        val rdd07 = rdd06.sortBy(_._3, false).sortBy(_._2, false)
        println("rdd07 = " + rdd07.collect().toBuffer) //(tom,99,98,97), (lieli,99,88,77), (jerry,99,77,66)
      }
    }

3. join, cogroup, cartesian, subtractByKey

    package com.atguigu.bigdata.spark.test

    import org.apache.spark.{SparkConf, SparkContext}

    object Demo3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("demo1").setMaster("local")
        val sc = new SparkContext(conf)
        val arr01 = Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 11))
        val arr02 = Array(("a", 6), ("b", 7), ("c", 8), ("e", 9), ("b", 12))
        val rdd01 = sc.makeRDD(arr01)
        val rdd02 = sc.makeRDD(arr02)

        val rdd03 = rdd01.join(rdd02) //内连接 inner join
        println("rdd03 = " + rdd03.collect().toBuffer)
        //rdd03 = ArrayBuffer((a,(1,6)), (a,(11,6)), (b,(2,7)), (b,(2,12)), (c,(3,8)))

        val rdd04 = rdd01.leftOuterJoin(rdd02)
        println("rdd04 = " + rdd04.collect().toBuffer)
        //rdd04 = ArrayBuffer((d,(5,None)), (a,(1,Some(6))), (a,(11,Some(6))), (b,(2,Some(7))), (b,(2,Some(12))), (c,(3,Some(8))))

        val rdd05 = rdd01.rightOuterJoin(rdd02)
        println("rdd05 = " + rdd05.collect().toBuffer)
        //rdd05 = ArrayBuffer((e,(None,9)), (a,(Some(1),6)), (a,(Some(11),6)), (b,(Some(2),7)), (b,(Some(2),12)), (c,(Some(3),8)))

        //在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
        //    参见groupByKey
        val rdd06 = rdd01.cogroup(rdd02)
        println("rdd06 = " + rdd06.collect().toBuffer)
        /*   --------------------
           rdd06 = ArrayBuffer(
             (d, (CompactBuffer(5), CompactBuffer())),
             (e, (CompactBuffer(), CompactBuffer(9))),
             (a, (CompactBuffer(1, 11), CompactBuffer(6))),
             (b, (CompactBuffer(2), CompactBuffer(7, 12))),
             (c, (CompactBuffer(3), CompactBuffer(8))))
           --------------------*/

        val arr03 = Array("a", "b")
        val arr04 = Array(1, 2)
        val rdd07 = sc.makeRDD(arr03)
        val rdd08 = sc.makeRDD(arr04)
        //两个RDD的笛卡尔积
        val rdd09 = rdd07.cartesian(rdd08)
        println("rdd09 = " + rdd09.collect().toBuffer)
        //rdd09 = ArrayBuffer((a,1), (a,2), (b,1), (b,2))

        //删掉RDD中键与另一RDD中键相同的元素
        //    val arr01 = Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 11))
        //    val arr02 = Array(("a", 6), ("b", 7), ("c", 8), ("e", 9), ("b", 12))
        val rdd10 = rdd01.subtractByKey(rdd02)
        val rdd11 = rdd02.subtractByKey(rdd01)
        println("rdd10 = " + rdd10.collect().toBuffer)
        println("rdd11 = " + rdd11.collect().toBuffer)
        //rdd10 = ArrayBuffer((d,5))
        //rdd11 = ArrayBuffer((e,9))
      }
    }

4. mapPartitions, mapPartitionsWithIndex, coalesce, repartition, countByKey

   import org.apache.spark.{SparkConf, SparkContext}

    object Demo3 {
      def main(args: Array[String]): Unit = {
        //   Spark:RDD分区数和分区器: https://www.cnblogs.com/xuejianbest/p/10285009.html
        val conf = new SparkConf().setAppName("demo1").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 8, 5)
        val rdd01 = sc.makeRDD(arr, 3)
        //获取分区数 rdd01.partitions.size = 3
        println("rdd01.partitions.size = " + rdd01.partitions.length)

        //只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioned的值是None
        println("partitioner = " + rdd01.partitioner)
        val arr01 = Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 11), ("a", 1), ("b", 2))
        val rdd02 = sc.makeRDD(arr01, 2)
        println("partitioner = " + rdd02.partitioner)
        /* -----------------
         rdd01.partitions.size = 3
         partitioner = None
         partitioner = None
         -----------------*/

        //类似于map,但独立地在RDD的每一个分片上运行
        val rdd03 = rdd01.mapPartitions { x => Iterator(x.mkString("|")) }
        println("rdd03 = " + rdd03.collect().toBuffer)
        //rdd03 = ArrayBuffer(1|2|3, 4|5|6, 7|8|8|5)

        //类似于mapPartitions,但func带有一个整数参数表示分片的索引值
        val result = rdd01.mapPartitionsWithIndex((index, iterator) => Iterator(index + " ~~~ " + iterator.mkString("|")))
        println("result = " + result.collect().toBuffer)
        //result = ArrayBuffer(0 ~~~ 1|2|3, 1 ~~~ 4|5|6, 2 ~~~ 7|8|8|5)


        //重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false
        // 少分区变多分区 true   多分区变少分区 false
        val rdd04 = rdd01.coalesce(2, shuffle = false)
        println("rdd04 = " + rdd04.partitions.length)
        val rdd05 = rdd01.coalesce(4, shuffle = false)
        println("rdd05 = " + rdd05.partitions.length)
        //重新分区 必须shuffle
        /*  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
            coalesce(numPartitions, shuffle = true)
          }*/
        val rdd06 = rdd01.repartition(2)
        println("rdd06 = " + rdd06.partitions.length)
        val rdd07 = rdd01.repartition(4)
        println("rdd07 = " + rdd07.partitions.length)
        //rdd04 = 2
        //rdd05 = 3
        //rdd06 = 2
        //rdd07 = 4

        //    (1, 2, 3, 4, 5, 6, 7, 8, 8, 5)
        //返回RDD的元素个数
        println(rdd01.count())
        //返回RDD的第一个元素(类似于take(1))
        println(rdd01.first())
        println(rdd02.first())
        rdd01.take(3)
        //使用自然顺序或自定义比较器返回RDD 的前n个元素。
        val arr03 = rdd01.takeOrdered(2)
        println("arr03 = " + arr03.toBuffer)
        //arr03 = ArrayBuffer(1, 2)

        //countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
        println("rdd02.countByKey() = " + rdd02.countByKey())
        println("rdd02.countByValue() = " + rdd02.countByValue())
        println("rdd01.countByValue() = " + rdd01.countByValue())
        /* ------------------
         //("a",1),("b",2),("c",3),("d",5),("a",11),("a",1),("b",2)
         key->key个数
         rdd02.countByKey() = Map(d -> 1, b -> 2, a -> 3, c -> 1)
         rdd02.countByValue() = Map((c,3) -> 1, (b,2) -> 2, (d,5) -> 1, (a,11) -> 1, (a,1) -> 2)
         //1,2,3,4,5,6,7,8,8,5  key->value个数
         rdd01.countByValue() = Map(5 -> 2, 1 -> 1, 6 -> 1, 2 -> 1, 7 -> 1, 3 -> 1, 8 -> 2, 4 -> 1)*/
      }
    }

2021-11-18 15:14