DStream 创建


1 、套接字方式:

socketTextStream( 主机名或 ip ,端口 )

package org.hnsw

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkJXQ {
  def main(args: Array[String]): Unit = {

    //1.创建streamingContext对象  保证工作节点处理的线程数>2    
    val conf = new SparkConf().setMaster("local[2]").setAppName("JXQ")
    val sc = new StreamingContext(conf,Seconds(5))
    //采集数据间隔时间5秒
    //2、创建inputDstream,采集数据创建Dstream,端口和ip要和数据生产方保持一致  
    val dString = sc.socketTextStream("192.168.3.66",8888)

    //3、Dstream操作,业务逻辑  约定分割字符是空格    
    val wordCount = dString.flatMap((x)=>{
    x.split(" ")       //a a    
    }).map((x)=>{     //(x,1)
    (a,1) (a,1)    
    }).reduceByKey((v1, v2)=>{
    v1+v2             //(a,1+1)    
    })
    wordCount.print()
    //调试阶段采集使用 打印数据处理后的结果
    //4、启动SparkStreaming    
    sc.start()

    //5、等待处理结果,挂起程序保证一直执行    
    sc.awaitTermination()
  }

}

2 、文件流方式:

textFileStream( 文件路径 )

package org.hnsw
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTxt {
  def main(args: Array[String]): Unit = {

    //1.创建streamingContext对象  保证工作节点处理的线程数>2
    val conf = new SparkConf().setMaster("local[2]").setAppName("JXQ")
    val sc = new StreamingContext(conf,Seconds(5))
    //采集数据间隔时间5秒
    //2、创建inputDstream,采集数据创建Dstream,端口和ip要和数据生产方保持一致
    val dString = sc.textFileStream("F:\\JXQ")

    //3、Dstream操作,业务逻辑  约定分割字符是空格
    val wordCount = dString.flatMap((x)=>{
        x.split(" ")         //a a
    }).map((x)=>{     
      (x,1)                 //(a,1) (a,1)      
    }).reduceByKey((v1, v2)=>{
        v1+v2               //(a,1+1)      
    })
    wordCount.print()
    //调试阶段采集使用 打印数据处理后的结果
    //4、启动SparkStreaming      
    sc.start()
    sc.awaitTermination()
  }


}

相关