大数据Spark实时处理--结构化流1(Structured Streaming)
Spark Streaming的不足
- 1)基于ProcessingTime
- 在数据处理过程中,是有几个时间的:
- ProcessingTime vs EventTime
- 12:00:00 数据的真正产生时间 :EventTime
- 12:01:10 进入Spark的时间 :ProcessingTime
- 2)Complex API
- DStream RDD
- 一个业务交给不同的开发人员去实现,可能最终的性能千差万别
- 3)批流代码不统一
- 4)end to end 端到端
Structured Streaming的概述
- 1)Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)
- 2)结构化流:构建在Spark SQL引擎之上的,可扩展、可容错的流处理引擎
- 3)Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.
- 4)In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
- 5)processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.
第一个Structured Streaming应用程序
- 1)左击项目----->new----->Module----->Maven----->next
- Artifactld:log-sss
- Module name:log-sss
- 2)导spark-sql、spark-core、spark-streaming的依赖
- 3)log-sss------src------main-------new directory:scala
- 4)右击scala-------Mark Directory as-------Sources Root
- 5)file-----project structure-----project settings------modules-----log-sss------add-----scala------ok
- 6)右击scala------new package------com.imooc.spark.sss
- 7)右击com.imooc.spark.sss-------new------scala class------wc-----object
- 8)运行程序
- 9)开启端口,[hadoop@spark000 ~]$ nc -lk 9999,并输入数据
- 10)在控制台接收结果
- 这个是展示所有输入值
package com.imooc.spark.sss import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession /* 基于spark sql */ object WordCount { //第1步:main def main(args: Array[String]): Unit = { //第2步:拿到SparkSession---builder---设置master // ----设置app的名字---.getOrCreate()----var-----拿到session,改名为spark //这里是入口 val spark = SparkSession .builder .master("local[2]") .appName(this.getClass.getName) //.appName("StructuredNetworkWordCount") .getOrCreate() import spark.implicits._ //第4步: 对接socket数据 //4.1 拿到spark,并读流数据----使用socket的方式-----这里是数据源 //spark.readStream.format("socket") //4.2 数据是来源的机器 //4.3 数据load进来-----.load().var---改为lines val lines = spark.readStream .format("socket") .option("host", "spark000") .option("port", 9999) .load() //第6步: val words = lines.as[String].flatMap(_.split(" ")) .groupBy("value") .count() //第5步: 将上述lines展示出来 //5.1 将lines写出去----写到控制台---开始----终止 val query = words.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() // //第3步:关闭----框架ok----因为是云平台,不能停止 // spark.stop() } }
Structured Streaming编程模型
- 核心思想:结构化流的关键思想是将实时数据流视为一个不断追加的表。它与批处理模型非常相似。无界表上的增量输入。
- Complete mode
- Append mode
- Update mode
处理EventTime和延迟数据
-
EventTime是嵌入数据本身的时间。对于许多应用程序,希望是基于EventTime进行操作。
-
例如,如果希望获得每分钟由物联网设备生成的Event数量,那么可能希望使用生成数据的时间EventTime(即数据中的EventTime),而不是Spark接收数据的时间。这个EventTime在这个模型中非常自然地表示出来——来自设备的每个事件都是表中的一行,而EventTime是行中的一列值。这允许基于窗口的聚合(例如,每分钟的事件数)只是事件时间列上的一种特殊类型的分组和聚合——每个时间窗口是一个组,每一行可以属于多个窗口/组。因此,可以在静态数据集(例如,从收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗口的聚合查询,从而使用户的生活更加轻松
- 此外,结构化流模型自然会根据EventTime处理比预期晚到的数据。由于Spark可以更新结果表,因此它可以完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。从Spark 2.1开始,我们就支持水印,它允许用户指定延迟数据的阈值,并允许引擎相应地清除旧状态。稍后将在“窗口操作”一节中详细解释这些操作。
使用SQL完成统计分析
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\WCApp.scala
- 1)运行程序
- 2)开启端口,[hadoop@spark000 ~]$ nc -lk 9999,并输入数据
- 3)在控制台接收结果
- 这个是可以累加,统计结果的
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode object WCApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() import spark.implicits._ val lines = spark.readStream.format("socket") .option("host", "spark000") .option("port", 9999) .load() val wordCount = lines.as[String] .flatMap(_.split(",")) .createOrReplaceTempView("wc") spark.sql( """ |select |value, count(1) as cnt |from |wc |group by value """.stripMargin) .writeStream .outputMode(OutputMode.Complete()) .format("console") .start() .awaitTermination() } }
对接csv数据源数据
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\csv\emp.csv
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 结果通过city字段,累加值,输出在控制台上。
1,pk,bj 2,zhangsan,sh 3,lisi,sz 4,lier,bj
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.window object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() //调用方法1---readCsvPartition readCsv(spark) } //方法1---readCsvPartition def readCsv(spark:SparkSession): Unit = { val userSchema = new StructType() .add("id", IntegerType) .add("name",StringType) .add("city", StringType) spark.readStream .format("csv") .schema(userSchema) .load("log-sss/data/csv") .groupBy("city") .count() .writeStream .outputMode(OutputMode.Complete()) .format("console") .start() .awaitTermination() }
}
对接分区数据源数据
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\partition
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 运行,控制台输出以年份为区分的数据展示,无累加。
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.window object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate() //调用方法2---readCsvPartition readCsvPartition(spark) }//方法2---readCsvPartition def readCsvPartition(spark:SparkSession): Unit = { val userSchema = new StructType() .add("id", IntegerType) .add("name",StringType) .add("city", StringType) spark.readStream .format("csv") .schema(userSchema) .load("log-sss/data/partition") .writeStream .format("console") .start() .awaitTermination() }
对接Kafka数据源数据
- 启动dfs、yarn、zookeeper、多broker的kafka
- 创建topic
[hadoop@spark000 bin]$ pwd /home/hadoop/app/kafka_2.12-2.5.0/bin [hadoop@spark000 bin]$ ./kafka-topics.sh --create --zookeeper spark000:2181 --replication-factor 1 --partitions 1 --topic ssskafkatopic Created topic ssskafkatopic.
- 启动producer
- 在IDEA运行后,在此处输入数据,在控制台展示结果
[hadoop@spark000 bin]$ pwd /home/hadoop/app/kafka_2.12-2.5.0/bin [hadoop@spark000 bin]$ ./kafka-console-producer.sh --broker-list spark000:9092 --topic ssskafkatopic
- IDEA添加依赖
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\pom.xml
org.apache.spark spark-sql-kafka-0-10_2.12
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
- 在控制台输出统计结果
package com.imooc.spark.sss import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.functions.window object SourceApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]") .appName(this.getClass.getName).getOrCreate()//调用方法3---kafkaSource kafkaSource(spark) }//方法3---kafkaSource def kafkaSource(spark:SparkSession): Unit = { import spark.implicits._ spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "spark000:9092") .option("subscribe", "ssskafkatopic") .load() .selectExpr("CAST(value AS STRING)") .as[String].flatMap(_.split(",")) .groupBy("value").count() .writeStream .format("console") .outputMode(OutputMode.Update()) .start() .awaitTermination() } }