DStream转换操作-黑名单过滤


package org.hnsw

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkLearn {

  def main(args: Array[String]): Unit = {
    //1、创建Streamingcontext,指定采集频率
    val conf = new SparkConf().setMaster("local[*]").setAppName("jxq")
    val ssc = new StreamingContext(conf, Seconds(5))
    //2、采集nc 网络访问数据(ip 用户),创建dstream

    val dstream_jxq =  ssc.socketTextStream("192.168.3.66",8888)
    val dStream_users_jxq=dstream_jxq.map((x)=> {
      val re = x.split(" ")
      (re(1), x) //(用户名,访问数据)
    })

    //3、读取配置文件, "用户名,判别符",创建rdd
    //(用户名,判别符)
    val mingdan_jxq = ssc.sparkContext.textFile("file:///F:\\mingdan.txt").map((x)=>{
      val re=x.split(" ")
      (re(0),re(1).toBoolean)})
    //4、连接dstream和rdd数据集合, dstream transform (dstream -> rdd)
    //transform方法功能:
    //1) join连接功能
    val dStreamRddRsult=dStream_users_jxq.transform((rdd)=>{
      val joinRdd = rdd.leftOuterJoin(mingdan_jxq)
      //2)fiter进行过滤,判断true要过滤掉
      val rddFilter = joinRdd.filter(y=>{
        println(y._2._2)
        if(y._2._2.getOrElse(false)){
          false
        }else{
          true
        }
      })
      //3)恢复用户访问数据,返回 访问数据:“ip 用户”
      val validRdd=rddFilter.map(y=>y._2._1)
      validRdd
    })

    //5、后台服务处理,简单打印一下结果
    dStreamRddRsult.print()

    //6、启动stream
    ssc.start()

    //7、服务挂起
    ssc.awaitTermination()

  }
}

相关