package org.hnsw
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkLearn {
def main(args: Array[String]): Unit = {
//1、创建Streamingcontext,指定采集频率
val conf = new SparkConf().setMaster("local[*]").setAppName("jxq")
val ssc = new StreamingContext(conf, Seconds(5))
//2、采集nc 网络访问数据(ip 用户),创建dstream
val dstream_jxq = ssc.socketTextStream("192.168.3.66",8888)
val dStream_users_jxq=dstream_jxq.map((x)=> {
val re = x.split(" ")
(re(1), x) //(用户名,访问数据)
})
//3、读取配置文件, "用户名,判别符",创建rdd
//(用户名,判别符)
val mingdan_jxq = ssc.sparkContext.textFile("file:///F:\\mingdan.txt").map((x)=>{
val re=x.split(" ")
(re(0),re(1).toBoolean)})
//4、连接dstream和rdd数据集合, dstream transform (dstream -> rdd)
//transform方法功能:
//1) join连接功能
val dStreamRddRsult=dStream_users_jxq.transform((rdd)=>{
val joinRdd = rdd.leftOuterJoin(mingdan_jxq)
//2)fiter进行过滤,判断true要过滤掉
val rddFilter = joinRdd.filter(y=>{
println(y._2._2)
if(y._2._2.getOrElse(false)){
false
}else{
true
}
})
//3)恢复用户访问数据,返回 访问数据:“ip 用户”
val validRdd=rddFilter.map(y=>y._2._1)
validRdd
})
//5、后台服务处理,简单打印一下结果
dStreamRddRsult.print()
//6、启动stream
ssc.start()
//7、服务挂起
ssc.awaitTermination()
}
}