1. 从集合(内存)中创建rdd
//1. 从集合(内存)中创建rdd
object initRddByList extends App {
//1. 该对象用于 : Spark应用参数的配置 将Spark的各种参数设置为key,value
// note : 1. 一旦一个SparkConf对象被传递给Spark,他就被克隆了,不能再被修改了(不支持运行时修改配置)
// 2. 这里的配置覆盖了 默认配置和系统属性
val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("initRddByList")
//2. Spark功能的主要入口
// SparkContext表示 与Spark的链接 可以在该集群上创建RDD、accumulators(累加器)、 broadcast variables(广播变量)
// note : 1. 每个JVM只能激活一个SparkContext,必须 stop()已存在的,才能创建一个新的
val sc: SparkContext = new SparkContext(sparkconf)
//3. 分发一个本地scala集合来形成 RDD
// 语法 : def parallelize[T](seq: Seq[T], numSlices: Int)(evidence$1: ClassTag[T]): RDD[T]
// @param seq Scala collection to distribute
// @param numSlices用于划分集合的分区数
// @return RDD表示分布式收集
val list_rdd: RDD[Int] = sc.parallelize(
List(1, 2, 3, 4)
)
// makeRDD 和 parallelize
val array_rdd: RDD[Int] = sc.makeRDD(
Array(4, 5, 6, 7)
)
//4. 语法 : def collect(): Array[T]
// 返回 一个包含RDD中所有元素的数组
// note : 只能在数据集较小的时候使用,此方法 是将所有分布式节点的RDD元素 收集到 Driver的内存中
private val array: Array[Int] = list_rdd.collect()
println(array.mkString(","))
//关闭资源
sc.stop()
}
2.1 从外部存储(文件)中 创建rdd (textFile按行读取)
//2.1 从外部存储(文件)中 创建rdd - textFile按行读取
object initRddByLocalFileByLine extends App {
val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("initRddByLocalFile")
val sc: SparkContext = new SparkContext(sparkconf)
//1. 读取 文件系统中的文本文件, 并将其作为String的RDD返回
// @param path支持的文件系统中文本文件的路径
// @param minPartitions建议RDD的最小分区数
// @return RDD的文本文件行
private val text_rdd: RDD[String] = sc.textFile("Spark_319/src/data/word.txt")
//private val hdfs_rdd: RDD[String] = sc.textFile("hdfs://gaocun:8020/user/hive/warehouse/person")
println(text_rdd.collect().mkString(","))
//println(hdfs_rdd.collect().mkString(","))
//关闭资源
sc.stop()
}
2.2 从外部存储(文件)中 创建rdd (wholeTextFiles 按文件读取)
//2.2 从外部存储(文件)中 创建rdd - wholeTextFiles 按文件读取
object initRddByLocalFileByFile extends App {
val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("initRddByLocalFile")
val sc: SparkContext = new SparkContext(sparkconf)
//1. 读取 文件系统中的文本文件, 并将其作为(文件路径,文件内容)的RDD返回
// 语法 : def wholeTextFiles(path: String, minPartitions: Int): RDD[(String, String)]
// @param 输入文件的目录,多文件路径时 用,分割
// @param minPartitions建议RDD的最小分区数
// @return RDD表示 文件路径和相应文件内容的元组 即 ("File Path","File xxx")
private val file_rdd: RDD[(String, String)] = sc.wholeTextFiles("Spark_319/src/data")
println(file_rdd.collect().mkString(","))
//关闭资源
sc.stop()
}