过滤+查询
一、查询字段 articleid 和 playcount,设置别名 playcount_jxq 显示前10条
package org.hnsw
import org.apache.spark.sql.SparkSession
//样本类 三个字段 userid(用户id)、artistid(艺术家ID)、playcount(播放次数)
case class UserArtist(userid:String, atiscleid:String, playcount:Int)
object SparkGloabalLearn {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
//1、以rdd的方式读取文本
val user_artist_dataRddjxq = session.sparkContext.textFile("file:///F:\\user_artist_data.txt")
//2.映射字段名 和 字段值
val artRdd = user_artist_dataRddjxq.map((x)=>{
val re = x.split(" ")
UserArtist(re(0), re(1), re(2).trim.toInt)
})
//使用隐寺函数,toDs
import session.implicits._
val ds = artRdd.toDS()
//4、查询字段 articleid 和 playcount 设置别名 playcount_jxq 显示前10条 方法:过滤列 + 限制
ds.select("atiscleid","playcount").limit(10).show()
ds.selectExpr("atiscleid","playcount as playcount_jxq").limit(10).show()
ds.where("playcount > 100").show()
//5、使用sql查询语句
//6、释放sesion和视图表
session.close()
}
}
二、people表,根据年龄 进行降序 order by desc
package org.hnsw
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._ //引入内置函数
object SparkLearn {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
val dataSet1 = session.read.json("file:///F:\\jxq10\\新建文件夹\\people.json")
val dataSet2 = session.read.json("file:///F:\\jxq10\\新建文件夹\\peopleAddress.json")
dataSet1.show()
dataSet2.show()
//people表,根据年龄 进行降序 order by desc
dataSet1.orderBy(desc("age")).show() //法一
dataSet1.sort(desc("age)).show() //法二
dataSet1.orderBy(-dataSet1("age")).show() //法三
dataSet1.sort(-dataSet1("age")).show() //法四
//隐式函数
import session.implicits._
dataSet1.orderBy($"age".desc).show() //法五
dataSet1.sort($"age".desc).show() //法六
}
}
三、people表,agg对多个聚合结果展示 groupby as方法起别名
package org.hnsw
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sparkagg {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
val dataSet1 = session.read.json("file:///F:\\jxq10\\新建文件夹\\people.json")
val dataSet2 = session.read.json("file:///F:\\jxq10\\新建文件夹\\peopleAddress.json")
dataSet1.show()
dataSet2.show()
//people表,agg对多个聚合结果展示 groupby as方法起别名
dataSet1.groupBy("sex").agg(avg("age").as("avg_age"),min("age"), count("age")).show()
}
}
四、过滤字段 指定数据表名 (字段)
package org.hnsw
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SparkInner {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("jxq").master("local[*]").getOrCreate()
val dataSet1 = session.read.json("file:///F:\\jxq10\\新建文件夹\\people.json")
val dataSet2 = session.read.json("file:///F:\\jxq10\\新建文件夹\\peopleAddress.json")
dataSet1.show()
dataSet2.show()
val result = dataSet1.join(dataSet2, dataSet1("name")===dataSet2("name"), "inner")
//过滤字段 指定数据表名 (字段)
result.select(dataSet1("age"), dataSet1("education"),dataSet1("name"), dataSet2("address"), dataSet2("area")).show()
}
}
五、DataFrme&DataSet组合查询(分别用sql和函数实现)
package org.hnsw
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object DFMysql2 {
def main(args: Array[String]): Unit = {
val sparksql = SparkSession.builder().appName("sparksql").master("local").getOrCreate()
// 指定数据库连接地址
val sqlUrl = "jdbc:mysql://192.168.3.66:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8"
// 指定访问表
val de_table = "departments"
val em_table = "employees"
// 设置配置参数,数据库访问的用户名和密码
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","root")
// 使用sparksession连接jdbc
val dfDepartments = sparksql.read.jdbc(sqlUrl,de_table,properties)
val dfEmployees = sparksql.read.jdbc(sqlUrl,em_table,properties)
dfDepartments.show()
dfEmployees.show()
//1、查询employees表中地址在中山的员工信息 where like
dfEmployees.where("address like '%中山%'").show()
//2、查询employees的员工信息按照出生日从大到小排序 oderby desc
dfEmployees.orderBy(desc("birthday")).show()
//3、统计employees表 每个部门对应的员工人数 group by count
dfEmployees.groupBy("departno").agg(count("name").as("count")).show()
//4、统计employees表 每个部门中员工最短工作年限别名为minyear、最大工作年限别名为maxyear、平均工作年限别名为avgyear
//groupby agg(min,max,avg)
dfEmployees.groupBy("departno").agg(min("workyear").as("minyear"),max("workyear").as("maxyear"),avg("workyear").as("avgyear")).show()
//5、jion连接departments和emplyees,显示字段员工no、名字、phone和部门名称
//1)链接字段 dfEmployees departno == dfDepartments no
//2)过滤字段 员工no、名字、phone和部门名称
val result = dfEmployees.join(dfDepartments, dfEmployees("departno")===dfDepartments("no"), "inner")
result.select(dfEmployees("no").as("员工no"),dfEmployees("name").as("名字"),dfEmployees("phone"), dfDepartments("name").as("部门名称")).show()
// 关闭session
sparksql.close()
}
}