大数据入门到精通8-spark RDD 复合key 和复合value 的map reduce操作


一.做基础数据准备

这次使用fights得数据。

scala> val flights= sc.textFile("/user/hdfs/data/Flights/flights.csv")
flights: org.apache.spark.rdd.RDD[String] = /user/hdfs/data/Flights/flights.csv MapPartitionsRDD[3] at textFile at :24

scala> val sampleFlights= sc.parallelize(flights.take(1000))
sampleFlights: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at :26

scala> val header= sampleFlights.first
header: String = YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY

scala> val filteredFlights= flights.filter( line=>{ line!= header } )
filteredFlights: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at :30

二.计算复合key 和 value

计算礼拜几,根据起飞时间计算是上午,下午,晚上,还是夜间飞机,把这两个作为复合key,根据这个来统计平均延误时间。

val timingMap = filteredFlights.map(flight =>{
val flightList=flight.split(",")
val dayOfWeek = flightList(3)
val time=if (flightList(10).length>0) {flightList(10).toInt}else 0
val delay=if (flightList(22).length>0) {flightList(22).toInt}else 0

var periodOfDay =0

if(time>=600 && time<1200){
periodOfDay=0
}else if (time>=1200 && time<1800){
periodOfDay=1
}else if (time>=1800 && time<2400){
periodOfDay=2
}else if (time>=0 && time<600){
periodOfDay=3
}
((dayOfWeek,periodOfDay),(delay,1))
})

timingMap.take(30).foreach(println)

//这里有一个重点,periodOfDay 不能定义为val,否则会有重复赋值得错误,如果有重复赋值得必要,使用var来定义。

//根据起飞时间分成1.2,3,4

//计算reduce 根据复合key ,计算延迟,如果在30分钟以内延迟到达,不计入延迟

val reduceMap=timingMap.reduceByKey((sum,current)=>{
var output =(0,0)
if (current._1>30){
output=((sum._1+current._1),(sum._2+current._2))
}else {output=(sum._1,sum._2)}
if (sum._1<0){
output=(0,0)
}
output
})

reduceMap.take(30).foreach(println)

//这里实际操作中把current._2写成1,因为实际上这个数据其实就是1,但是发现如果写成1,每次的结果都不一样,这里还是必须要使用current._2

 三、排序并求平均延迟

val sortedDelays= reduceMap.sortByKey()

val delayByTime = sortedDelays.map(rec=>{
val dayOfWeek =rec._1._1
val time= rec._1._2
val chance =(rec._2._1+0.0)/rec._2._2
var periodOfDay=""
if (time==0){
periodOfDay="Morning"
}else if (time==1){
periodOfDay="Afternoon"
}else if (time==2){
periodOfDay="Evening"
}else if (time==3){
periodOfDay="Night"
}

dayOfWeek+", "+periodOfDay+", "+chance

})

delayByTime.take(30).foreach(println)

相关