Spark 系统分区方式
哈希分区( HashPartitioner ),根据哈希值分区
package org.hnsw
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object SparkLearn_obj {
def main(args: Array[String]): Unit = {
//初始化sparkContext
val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
val sc = new SparkContext(conf)
//1.创建Rdd
val rdd = sc.parallelize(List((1,"a"),(2,"b"),(2,"d"),(3,"c"),(4,"d"),(5,"f")))
//2.根据hash进行分区 分两个区
val patitionrdd = rdd.partitionBy(new HashPartitioner(3))
//3.保存分区结果到磁盘
patitionrdd.saveAsTextFile("out/hashpart")
}
}
范围分区( RangePartitioner ),将一定范围的数据映射到一个分区中
package org.hnsw
import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
object SparkObjectpr {
def main(args: Array[String]): Unit = {
//初始化SparkContext
val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
val sc = new SparkContext(conf)
//1.创建Rdd
val rdd = sc.parallelize(List((1,"a"),(2,"b"),(2,"d"),(3,"c"),(4,"d"),(5,"f"),(5,"m"),(6,"k")))
//2.根据范围分区 分两个区
val patitionrdd = rdd.partitionBy(new RangePartitioner(3,rdd))
//3.保存分区结果到磁盘
patitionrdd.saveAsTextFile("out/rangepart1")
}
}
用户自定义分区
package org.hnsw
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Myinobject {
def main(args: Array[String]): Unit = {
//初始化sparkcontext
val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
val sc = new SparkContext(conf)
//1、创建Rdd
val rdd = sc.parallelize(List((1,"a"),(2,"b"),(2,"d"),(3,"c"),(4,"d"),(5,"f"),(5,"m"),(6,"k")))
//2、根据自定义分区 分2个区
val patitionrdd = rdd.partitionBy(new JxqPatition)
//3、保存分区结果到磁盘
patitionrdd.saveAsTextFile("out/jxqpart")
}
class JxqPatition extends Partitioner{
//ctrl + r
override def numPartitions: Int = {
//分区数
2
}
//分区策越
override def getPartition(key: Any): Int = {
//根据key到奇偶分区 0 ~ num-1
if(key.toString.toInt%2==0){
0
}else{
1
}
}
}
}
用户自定义分区案例
package org.hnsw
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Myinobject {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("分区Rdd").setMaster("local[*]")
val sc = new SparkContext(conf)
// 1、加载文档创建rdd
// val student = sc.textFile(args(0)) //第一个参数
val student = sc.textFile(args(0))
val stuRdd_jxq = student.map((x)=>{
val re = x.split(",")
(re(0),x)
})
println(stuRdd_jxq.collect().toList)
val patitionrdd_jxq = stuRdd_jxq.partitionBy(new JxqPatition)
val mapRdd_values_jxq = patitionrdd_jxq.map((x)=>{
x._2
})
println(mapRdd_values_jxq.collect().toList)
//保存到磁盘
mapRdd_values_jxq.saveAsTextFile(args(1))
}
class JxqPatition extends Partitioner{
//ctrl + r
override def numPartitions: Int = {
//分区数
4
}
//分区策越
override def getPartition(key: Any): Int = {
//根据key到奇偶分区 0 ~ num-1
if(key.toString.contains("2013") ){
0
}else if(key.toString.contains("2014")){
1
}else if(key.toString.contains("2015")){
2
}
else{
3
}
}
}
}