大数据Spark实时处理--结构化流1(Structured Streaming)


Spark Streaming的不足

  • 1)基于ProcessingTime
  • 在数据处理过程中,是有几个时间的:
  • ProcessingTime  vs   EventTime
  • 12:00:00 数据的真正产生时间   :EventTime
  • 12:01:10 进入Spark的时间        :ProcessingTime
  • 2)Complex API
  • DStream      RDD
  • 一个业务交给不同的开发人员去实现,可能最终的性能千差万别
  • 3)批流代码不统一
  • 4)end to end 端到端

Structured Streaming的概述

  • 1)Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)
  • 2)结构化流:构建在Spark SQL引擎之上的可扩展、可容错的流处理引擎
  • 3)Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. 
  • 4)In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
  • 5)processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. 

第一个Structured Streaming应用程序

  • 1)左击项目----->new----->Module----->Maven----->next
  • Artifactld:log-sss
  • Module name:log-sss
  • 2)导spark-sql、spark-core、spark-streaming的依赖
  • 3)log-sss------src------main-------new directory:scala
  • 4)右击scala-------Mark Directory as-------Sources Root
  • 5)file-----project structure-----project settings------modules-----log-sss------add-----scala------ok
  • 6)右击scala------new package------com.imooc.spark.sss
  • 7)右击com.imooc.spark.sss-------new------scala class------wc-----object
  • 8)运行程序
  • 9)开启端口,[hadoop@spark000 ~]$ nc -lk 9999,并输入数据
  • 10)在控制台接收结果
  • 这个是展示所有输入值
package com.imooc.spark.sss

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

/*
 基于spark sql
 */
object WordCount {
  //第1步:main
  def main(args: Array[String]): Unit = {
    //第2步:拿到SparkSession---builder---设置master
    // ----设置app的名字---.getOrCreate()----var-----拿到session,改名为spark
    //这里是入口
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName(this.getClass.getName)
      //.appName("StructuredNetworkWordCount")
      .getOrCreate()

    import spark.implicits._

    //第4步: 对接socket数据
    //4.1 拿到spark,并读流数据----使用socket的方式-----这里是数据源
    //spark.readStream.format("socket")
    //4.2 数据是来源的机器
    //4.3 数据load进来-----.load().var---改为lines
    val lines = spark.readStream
      .format("socket")
      .option("host", "spark000")
      .option("port", 9999)
      .load()

    //第6步:
    val words = lines.as[String].flatMap(_.split(" "))
      .groupBy("value")
      .count()

    //第5步: 将上述lines展示出来
    //5.1 将lines写出去----写到控制台---开始----终止
    val query = words.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()


//    //第3步:关闭----框架ok----因为是云平台,不能停止
//    spark.stop()
  }
}

Structured Streaming编程模型

  • 核心思想:结构化流的关键思想是将实时数据流视为一个不断追加的表。它与批处理模型非常相似。无界表上的增量输入。
  • Complete mode
  • Append mode
  • Update mode

处理EventTime和延迟数据

  • EventTime是嵌入数据本身的时间。对于许多应用程序,希望是基于EventTime进行操作。

  • 例如,如果希望获得每分钟由物联网设备生成的Event数量,那么可能希望使用生成数据的时间EventTime(即数据中的EventTime),而不是Spark接收数据的时间。这个EventTime在这个模型中非常自然地表示出来——来自设备的每个事件都是表中的一行,而EventTime是行中的一列值。这允许基于窗口的聚合(例如,每分钟的事件数)只是事件时间列上的一种特殊类型的分组和聚合——每个时间窗口是一个组,每一行可以属于多个窗口/组。因此,可以在静态数据集(例如,从收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗口的聚合查询,从而使用户的生活更加轻松

  •  此外,结构化流模型自然会根据EventTime处理比预期晚到的数据。由于Spark可以更新结果表,因此它可以完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。从Spark 2.1开始,我们就支持水印,它允许用户指定延迟数据的阈值,并允许引擎相应地清除旧状态。稍后将在“窗口操作”一节中详细解释这些操作。

使用SQL完成统计分析

  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\WCApp.scala
  • 1)运行程序
  • 2)开启端口,[hadoop@spark000 ~]$ nc -lk 9999,并输入数据
  • 3)在控制台接收结果
  • 这个是可以累加,统计结果的
package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object WCApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()

    import spark.implicits._
    val lines = spark.readStream.format("socket")
      .option("host", "spark000")
      .option("port", 9999)
      .load()

    val wordCount = lines.as[String]
      .flatMap(_.split(","))
      .createOrReplaceTempView("wc")

    spark.sql(
      """
        |select
        |value, count(1) as cnt
        |from
        |wc
        |group by value
      """.stripMargin)
      .writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()

  }

}

对接csv数据源数据

  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\csv\emp.csv
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 结果通过city字段,累加值,输出在控制台上。
1,pk,bj
2,zhangsan,sh
3,lisi,sz
4,lier,bj
package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()
//调用方法1---readCsvPartition
    readCsv(spark)

  }

  //方法1---readCsvPartition
  def readCsv(spark:SparkSession): Unit = {

    val userSchema = new StructType()
      .add("id", IntegerType)
      .add("name",StringType)
      .add("city", StringType)

    spark.readStream
      .format("csv")
      .schema(userSchema)
      .load("log-sss/data/csv")
      .groupBy("city")
      .count()
      .writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
  }
}

 对接分区数据源数据

  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\data\partition
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 运行,控制台输出以年份为区分的数据展示,无累加。
package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()
//调用方法2---readCsvPartition
    readCsvPartition(spark)
  }//方法2---readCsvPartition
  def readCsvPartition(spark:SparkSession): Unit = {

    val userSchema = new StructType()
      .add("id", IntegerType)
      .add("name",StringType)
      .add("city", StringType)

    spark.readStream
      .format("csv")
      .schema(userSchema)
      .load("log-sss/data/partition")
      .writeStream
      .format("console")
      .start()
      .awaitTermination()
  }

对接Kafka数据源数据

  • 启动dfs、yarn、zookeeper、多broker的kafka
  • 创建topic
[hadoop@spark000 bin]$ pwd
/home/hadoop/app/kafka_2.12-2.5.0/bin
[hadoop@spark000 bin]$ ./kafka-topics.sh --create --zookeeper spark000:2181 --replication-factor 1 --partitions 1 --topic ssskafkatopic
Created topic ssskafkatopic.
  • 启动producer
  • 在IDEA运行后,在此处输入数据,在控制台展示结果
[hadoop@spark000 bin]$ pwd
/home/hadoop/app/kafka_2.12-2.5.0/bin
[hadoop@spark000 bin]$ ./kafka-console-producer.sh --broker-list spark000:9092 --topic ssskafkatopic
  • IDEA添加依赖
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\pom.xml

     org.apache.spark
     spark-sql-kafka-0-10_2.12
  • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-sss\src\main\scala\com\imooc\spark\sss\SourceApp.scala
  • 在控制台输出统计结果
package com.imooc.spark.sss

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.functions.window

object SourceApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()//调用方法3---kafkaSource
    kafkaSource(spark)

  }//方法3---kafkaSource
  def kafkaSource(spark:SparkSession): Unit = {
    import spark.implicits._
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "spark000:9092")
      .option("subscribe", "ssskafkatopic")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String].flatMap(_.split(","))
      .groupBy("value").count()
      .writeStream
      .format("console")
      .outputMode(OutputMode.Update())
      .start()
      .awaitTermination()
  }
}