DataFrame输出


一、文本输出

package org.hnsw

import org.apache.spark.sql.{SaveMode, SparkSession}


object SparkLearn {

  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
    val dataset1 = session.read.json("file:///C:\\Users\\hnswxy\\Desktop\\people.json")

    val dataset2 = dataset1.where("age > 20")
    //指定输出 ,保存模式  文本格式  输出路径
    dataset2.write.mode(SaveMode.Overwrite).format("text").save("out/jxq")
    dataset2.show()
  }
}

二、输出外部数据库

package org.hnsw

import java.util.Properties
import org.apache.spark.sql.{SaveMode, SparkSession}

object DFMysqlOperate {

  def main(args: Array[String]): Unit = {
    val sparksql = SparkSession.builder().appName("sparksql").master("local").getOrCreate()
    //1、读取json数据
    val sqlUrl = "jdbc:mysql://192.168.3.66:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8"
    val peopleDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\people.json")
    val peopleAddrDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\peopleAddress.json")
    peopleDS.show()
    peopleAddrDS.show()

    //2、连表合并字段  name, age, sex, city
    val peopleJoin = peopleDS.join(peopleAddrDS, peopleDS("name")===peopleAddrDS("name"), "inner")
    peopleJoin.show()
    val resultDS = peopleJoin.select(peopleDS("name"),peopleDS("age"), peopleDS("sex"), peopleAddrDS("city"))
    resultDS.show()
    //3、将数据写入到msyql表    指定jxq表
    resultDS.write.mode(SaveMode.Overwrite).option("url",sqlUrl)
      .option("dbtable","jxq")
      .option("user","root")
      .option("password","root")
      .option("driver","com.mysql.jdbc.Driver").format("jdbc").save()


    // 关闭session
    sparksql.close()
  }
}

三、输出 Hive 数据库

package org.hnsw

import org.apache.hadoop.hdfs.server.namenode.SafeMode
import org.apache.spark.sql.{SaveMode, SparkSession}

object DFHiveOperate {
  def main(args: Array[String]): Unit = {
    val sparksql = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate()
    //1、读取json数据
    val sqlUrl = "jdbc:mysql://192.168.3.66:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8"
    val peopleDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\people.json")
    val peopleAddrDS = sparksql.read.json("file:///C:\\Users\\hnswxy\\Desktop\\peopleAddress.json")
    peopleDS.show()
    peopleAddrDS.show()

    //2、连表合并字段  name, age, sex, city
    val peopleJoin = peopleDS.join(peopleAddrDS, peopleDS("name")===peopleAddrDS("name"), "inner")
    peopleJoin.show()
    val resultDS = peopleJoin.select(peopleDS("name"),peopleDS("age"), peopleDS("sex"), peopleAddrDS("city"))
    resultDS.show()
    sparksql.sql("create database if not exists people")
//    val peopleRdd = sparksql.sparkContext(resultDS)
    resultDS.write.mode(SaveMode.Overwrite).saveAsTable("people.jxq_table")

    // 关闭session
    sparksql.close()

  }

}

相关