一、文本输出
package org.hnsw
import org.apache.spark.sql.{SaveMode, SparkSession}
object SparkLearn {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
val dataset1 = session.read.json("file:///C:\\Users\\hnswxy\\Desktop\\people.json")
val dataset2 = dataset1.where("age > 20")
//指定输出 ,保存模式 文本格式 输出路径
dataset2.write.mode(SaveMode.Overwrite).format("text").save("out/jxq")
dataset2.show()
}
}
二、输出外部数据库
package org.hnsw
import java.util.Properties
import org.apache.spark.sql.{SaveMode, SparkSession}
object DFMysqlOperate {
def main(args: Array[String]): Unit = {
val sparksql = SparkSession.builder().appName("sparksql").master("local").getOrCreate()
//1、读取json数据
val sqlUrl = "jdbc:mysql://192.168.3.66:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8"
val peopleDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\people.json")
val peopleAddrDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\peopleAddress.json")
peopleDS.show()
peopleAddrDS.show()
//2、连表合并字段 name, age, sex, city
val peopleJoin = peopleDS.join(peopleAddrDS, peopleDS("name")===peopleAddrDS("name"), "inner")
peopleJoin.show()
val resultDS = peopleJoin.select(peopleDS("name"),peopleDS("age"), peopleDS("sex"), peopleAddrDS("city"))
resultDS.show()
//3、将数据写入到msyql表 指定jxq表
resultDS.write.mode(SaveMode.Overwrite).option("url",sqlUrl)
.option("dbtable","jxq")
.option("user","root")
.option("password","root")
.option("driver","com.mysql.jdbc.Driver").format("jdbc").save()
// 关闭session
sparksql.close()
}
}
三、输出 Hive 数据库
package org.hnsw
import org.apache.hadoop.hdfs.server.namenode.SafeMode
import org.apache.spark.sql.{SaveMode, SparkSession}
object DFHiveOperate {
def main(args: Array[String]): Unit = {
val sparksql = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate()
//1、读取json数据
val sqlUrl = "jdbc:mysql://192.168.3.66:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8"
val peopleDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\people.json")
val peopleAddrDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\peopleAddress.json")
peopleDS.show()
peopleAddrDS.show()
//2、连表合并字段 name, age, sex, city
val peopleJoin = peopleDS.join(peopleAddrDS, peopleDS("name")===peopleAddrDS("name"), "inner")
peopleJoin.show()
val resultDS = peopleJoin.select(peopleDS("name"),peopleDS("age"), peopleDS("sex"), peopleAddrDS("city"))
resultDS.show()
sparksql.sql("create database if not exists people")
// val peopleRdd = sparksql.sparkContext(resultDS)
resultDS.write.mode(SaveMode.Overwrite).saveAsTable("people.jxq_table")
// 关闭session
sparksql.close()
}
}