Spark数据分区


Spark 系统分区方式

哈希分区( HashPartitioner ),根据哈希值分区

package org.hnsw

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkLearn_obj {
  def main(args: Array[String]): Unit = {
    //初始化sparkContext
    val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //1.创建Rdd
    val rdd = sc.parallelize(List((1,"a"),(2,"b"),(2,"d"),(3,"c"),(4,"d"),(5,"f")))
    //2.根据hash进行分区 分两个区
    val patitionrdd = rdd.partitionBy(new HashPartitioner(3))
    //3.保存分区结果到磁盘
    patitionrdd.saveAsTextFile("out/hashpart")

  }
}

范围分区( RangePartitioner ),将一定范围的数据映射到一个分区中

package org.hnsw

import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}

object SparkObjectpr {
  def main(args: Array[String]): Unit = {
    //初始化SparkContext
    val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //1.创建Rdd
    val rdd = sc.parallelize(List((1,"a"),(2,"b"),(2,"d"),(3,"c"),(4,"d"),(5,"f"),(5,"m"),(6,"k")))
    //2.根据范围分区 分两个区
    val patitionrdd = rdd.partitionBy(new RangePartitioner(3,rdd))
    //3.保存分区结果到磁盘
    patitionrdd.saveAsTextFile("out/rangepart1")
  }
}

用户自定义分区

package org.hnsw
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Myinobject {
  def main(args: Array[String]): Unit = {
    //初始化sparkcontext
    val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //1、创建Rdd
    val rdd = sc.parallelize(List((1,"a"),(2,"b"),(2,"d"),(3,"c"),(4,"d"),(5,"f"),(5,"m"),(6,"k")))
    //2、根据自定义分区 分2个区
    val patitionrdd = rdd.partitionBy(new JxqPatition)
    //3、保存分区结果到磁盘
    patitionrdd.saveAsTextFile("out/jxqpart")

  }


class  JxqPatition extends Partitioner{
  //ctrl + r
  override def numPartitions: Int = {
    //分区数
    2
  }

  //分区策越
  override def getPartition(key: Any): Int = {
    //根据key到奇偶分区 0 ~ num-1
    if(key.toString.toInt%2==0){
      0
    }else{
      1
    }

  }
}

} 

用户自定义分区案例

package org.hnsw
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Myinobject {
  def main(args: Array[String]): Unit = {
    val conf =  new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
    val sc = new SparkContext(conf)
    // 1、加载文档创建rdd
    //    val student = sc.textFile(args(0)) //第一个参数
    val student = sc.textFile(args(0))

    val stuRdd_jxq = student.map((x)=>{
      val re = x.split(",")
      (re(0),x)
    })


    println(stuRdd_jxq.collect().toList)

    val patitionrdd_jxq = stuRdd_jxq.partitionBy(new JxqPatition)
    val mapRdd_values_jxq = patitionrdd_jxq.map((x)=>{
      x._2
    })
    println(mapRdd_values_jxq.collect().toList)
    //保存到磁盘
    mapRdd_values_jxq.saveAsTextFile(args(1))

  }



  class  JxqPatition extends Partitioner{
    //ctrl + r
    override def numPartitions: Int = {
      //分区数
      4
    }

    //分区策越
    override def getPartition(key: Any): Int = {
      //根据key到奇偶分区 0 ~ num-1
      if(key.toString.contains("2013") ){
        0
      }else if(key.toString.contains("2014")){
        1
      }else if(key.toString.contains("2015")){
        2
      }
      else{
        3
      }

    }
  }


}

相关