DataFrame过滤查询


过滤+查询

一、查询字段 articleid 和 playcount,设置别名 playcount_jxq 显示前10条

package org.hnsw

import org.apache.spark.sql.SparkSession
//样本类 三个字段 userid(用户id)、artistid(艺术家ID)、playcount(播放次数)
case class UserArtist(userid:String, atiscleid:String, playcount:Int)
object SparkGloabalLearn {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
    //1、以rdd的方式读取文本
    val user_artist_dataRddjxq = session.sparkContext.textFile("file:///F:\\user_artist_data.txt")
    //2.映射字段名 和 字段值
    val artRdd = user_artist_dataRddjxq.map((x)=>{
      val re = x.split(" ")
      UserArtist(re(0), re(1), re(2).trim.toInt)
    })
    //使用隐寺函数,toDs
    import session.implicits._
    val ds = artRdd.toDS()
    //4、查询字段 articleid 和 playcount 设置别名 playcount_jxq 显示前10条 方法:过滤列 + 限制
    ds.select("atiscleid","playcount").limit(10).show()
    ds.selectExpr("atiscleid","playcount as playcount_jxq").limit(10).show()
    ds.where("playcount > 100").show()
    //5、使用sql查询语句

    //6、释放sesion和视图表
    session.close()
  }

}

二、people表,根据年龄 进行降序 order by  desc

package org.hnsw

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._  //引入内置函数

object SparkLearn {

  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
    val dataSet1 = session.read.json("file:///F:\\jxq10\\新建文件夹\\people.json")
    val dataSet2 = session.read.json("file:///F:\\jxq10\\新建文件夹\\peopleAddress.json")
    dataSet1.show()
    dataSet2.show()

    //people表,根据年龄 进行降序 order by  desc
    dataSet1.orderBy(desc("age")).show() //法一
    dataSet1.sort(desc("age)).show() //法二
    dataSet1.orderBy(-dataSet1("age")).show() //法三
    dataSet1.sort(-dataSet1("age")).show() //法四
    //隐式函数
    import session.implicits._
    dataSet1.orderBy($"age".desc).show() //法五
    dataSet1.sort($"age".desc).show() //法六
  }
}

三、people表,agg对多个聚合结果展示 groupby   as方法起别名

package org.hnsw

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object sparkagg {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
    val dataSet1 = session.read.json("file:///F:\\jxq10\\新建文件夹\\people.json")
    val dataSet2 = session.read.json("file:///F:\\jxq10\\新建文件夹\\peopleAddress.json")
    dataSet1.show()
    dataSet2.show()

    //people表,agg对多个聚合结果展示 groupby    as方法起别名
    dataSet1.groupBy("sex").agg(avg("age").as("avg_age"),min("age"), count("age")).show()

  }
}

四、过滤字段 指定数据表名 (字段)

package org.hnsw

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SparkInner {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
    val dataSet1 = session.read.json("file:///F:\\jxq10\\新建文件夹\\people.json")
    val dataSet2 = session.read.json("file:///F:\\jxq10\\新建文件夹\\peopleAddress.json")
    dataSet1.show()
    dataSet2.show()

    val result = dataSet1.join(dataSet2, dataSet1("name")===dataSet2("name"), "inner")
    //过滤字段 指定数据表名 (字段)
    result.select(dataSet1("age"), dataSet1("education"),dataSet1("name"), dataSet2("address"), dataSet2("area")).show()


  }

}

五、DataFrme&DataSet组合查询(分别用sql和函数实现)

package org.hnsw

import java.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object DFMysql2 {

  def main(args: Array[String]): Unit = {
    val sparksql = SparkSession.builder().appName("sparksql").master("local").getOrCreate()
    // 指定数据库连接地址
    val sqlUrl = "jdbc:mysql://192.168.3.66:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8"
    // 指定访问表
    val de_table = "departments"
    val em_table = "employees"
    // 设置配置参数,数据库访问的用户名和密码
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    // 使用sparksession连接jdbc
    val dfDepartments = sparksql.read.jdbc(sqlUrl,de_table,properties)
    val dfEmployees = sparksql.read.jdbc(sqlUrl,em_table,properties)
    dfDepartments.show()
    dfEmployees.show()
    //1、查询employees表中地址在中山的员工信息 where like
    dfEmployees.where("address like '%中山%'").show()
    //2、查询employees的员工信息按照出生日从大到小排序 oderby desc
    dfEmployees.orderBy(desc("birthday")).show()
    //3、统计employees表 每个部门对应的员工人数 group by   count
    dfEmployees.groupBy("departno").agg(count("name").as("count")).show()
    //4、统计employees表 每个部门中员工最短工作年限别名为minyear、最大工作年限别名为maxyear、平均工作年限别名为avgyear
    //groupby agg(min,max,avg)
    dfEmployees.groupBy("departno").agg(min("workyear").as("minyear"),max("workyear").as("maxyear"),avg("workyear").as("avgyear")).show()
    //5、jion连接departments和emplyees,显示字段员工no、名字、phone和部门名称
    //1)链接字段 dfEmployees departno == dfDepartments no
    //2)过滤字段 员工no、名字、phone和部门名称
    val result = dfEmployees.join(dfDepartments, dfEmployees("departno")===dfDepartments("no"), "inner")
    result.select(dfEmployees("no").as("员工no"),dfEmployees("name").as("名字"),dfEmployees("phone"), dfDepartments("name").as("部门名称")).show()


    // 关闭session
    sparksql.close()
  }
}

相关