1 、套接字方式:
socketTextStream( 主机名或 ip ,端口 )
package org.hnsw
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkJXQ {
def main(args: Array[String]): Unit = {
//1.创建streamingContext对象 保证工作节点处理的线程数>2
val conf = new SparkConf().setMaster("local[2]").setAppName("JXQ")
val sc = new StreamingContext(conf,Seconds(5))
//采集数据间隔时间5秒
//2、创建inputDstream,采集数据创建Dstream,端口和ip要和数据生产方保持一致
val dString = sc.socketTextStream("192.168.3.66",8888)
//3、Dstream操作,业务逻辑 约定分割字符是空格
val wordCount = dString.flatMap((x)=>{
x.split(" ") //a a
}).map((x)=>{ //(x,1)
(a,1) (a,1)
}).reduceByKey((v1, v2)=>{
v1+v2 //(a,1+1)
})
wordCount.print()
//调试阶段采集使用 打印数据处理后的结果
//4、启动SparkStreaming
sc.start()
//5、等待处理结果,挂起程序保证一直执行
sc.awaitTermination()
}
}
2 、文件流方式:
textFileStream( 文件路径 )
package org.hnsw
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTxt {
def main(args: Array[String]): Unit = {
//1.创建streamingContext对象 保证工作节点处理的线程数>2
val conf = new SparkConf().setMaster("local[2]").setAppName("JXQ")
val sc = new StreamingContext(conf,Seconds(5))
//采集数据间隔时间5秒
//2、创建inputDstream,采集数据创建Dstream,端口和ip要和数据生产方保持一致
val dString = sc.textFileStream("F:\\JXQ")
//3、Dstream操作,业务逻辑 约定分割字符是空格
val wordCount = dString.flatMap((x)=>{
x.split(" ") //a a
}).map((x)=>{
(x,1) //(a,1) (a,1)
}).reduceByKey((v1, v2)=>{
v1+v2 //(a,1+1)
})
wordCount.print()
//调试阶段采集使用 打印数据处理后的结果
//4、启动SparkStreaming
sc.start()
sc.awaitTermination()
}
}