看了这篇博客,你还敢说不会Structured Streaming?

2021-01-27 16:03:53 浏览数 (1)

写在前面: 博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己!

本篇博客,博主为大家带来的是关于Structured Streaming从入门到实战的一个攻略,希望感兴趣的朋友多多点赞支持!!


一、Structured Streaming曲折发展史

1.1 SparkStreaming

Spark Streaming针对实时数据流,提供了一套可扩展、高吞吐、可容错的流式计算模型。Spark Streaming接收实时数据源的数据,切分成很多小的batches,然后被Spark Engine执行,产出同样由很多小的batchs组成的结果流。本质上,这是一种micro-batch(微批处理)的方式处理。

不足在于处理延时较高(无法优化到秒以下的数量级), 无法支持基于event_time的时间窗口做聚合逻辑。

1.2.Structured Streaming

1.2.1 介绍
  • 官网

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

  • 简介

spark在2.0版本中发布了新的流计算的API,Structured Streaming/结构化流。

Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于event_time的时间窗口的处理逻辑。

随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用Scala、Java、Python或R中的DataSet/DataFrame API来表示流聚合、事件时间窗口、流到批连接等。此外,Structured Streaming会通过checkpoint和预写日志等机制来实现Exactly-Once语义。

简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,Structured Streaming提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑更多细节。

默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端的延迟,最短可达100毫秒,并且完全可以保证一次容错。自Spark 2.3以来,引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至1毫秒的端到端延迟。也就是类似于 Flink 那样的实时流,而不是小批量处理。实际开发可以根据应用程序要求选择处理模式,但是连续处理在使用的时候仍然有很多限制,目前大部分情况还是应该采用小批量模式。

1.2.2 API

1.Spark Streaming 时代 -DStream-RDD

Spark Streaming 采用的数据抽象是DStream,而本质上就是时间上连续的RDD,对数据流的操作就是针对RDD的操作。

2.Structured Streaming 时代 - DataSet/DataFrame -RDD

Structured Streaming是Spark2.0新增的可扩展和高容错性的实时计算框架,它构建于Spark SQL引擎,把流式计算也统一到DataFrame/Dataset里去了。 Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步。

1.2.3 主要优势

1.简洁的模型。Structured Streaming 的模型很简洁,易于理解。用户可以直接把一个流想象成是无限增长的表格。

这里解释一下为什么是无限增长的表格? 因为Structured Streaming相当于SparkSQL和SparkStreaming功能的一个结合,可以使用SQL的形式计算实时数据。SparkSQL底层提供的抽象为DataFrame和DataSet,其中DataFrame=RDD 结构,DataSet=RDD 结构 类型,因此我们将其看成是一个表格,而SparkStreaming所接受的数据是流式数据,会随着时间的增长越来越多,所以可以说是无限增长的…

2.一致的 API。由于和 Spark SQL 共用大部分 API,对 Spaprk SQL 熟悉的用户很容易上手,代码也十分简洁。同时批处理和流处理程序还可以共用代码,不需要开发两套不同的代码,显著提高了开发效率。

3.卓越的性能。Structured Streaming 在与 Spark SQL 共用 API 的同时,也直接使用了 Spark SQL 的 Catalyst 优化器和 Tungsten,数据处理性能十分出色。此外,Structured Streaming 还可以直接从未来 Spark SQL 的各种性能优化中受益。

4.多语言支持。Structured Streaming 直接支持目前 Spark SQL 支持的语言,包括Scala,Java,Python,R 和 SQL 。用户可以选择自己喜欢的语言进行开发。

1.2.4.编程模型
  • 编程模型概述

一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾

对动态数据源进行实时查询,就是对当前的表格内容执行一次 SQL 查询。

数据查询,用户通过触发器(Trigger)设定时间(毫秒级)。也可以设定执行周期。

一个流的输出有多种模式,既可以是基于整个输入执行查询后的完整结果,也可以选择只输出与上次查询相比的差异,或者就是简单地追加最新的结果。

  • 核心思想

Structured Streaming最核心的思想就是将实时到达的数据不断追加到unbound table无界表,到达流的每个数据项(RDD)就像是表中的一个新行被附加到无边界的表中.这样用户就可以用静态结构化数据的批处理查询方式进行流计算,如可以使用SQL对到来的每一行数据进行实时查询处理;(SparkSQL SparkStreaming=StructuredStreaming)

  • 应用场景

Structured Streaming将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的实时数据;

  • WordCount图解

如图所示,

第一行表示从socket不断接收数据, 第二行可以看成是之前提到的“unbound table", 第三行为最终的wordCounts是结果集。

当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;

该示例设置为Complete Mode(输出所有数据),因此每次都将所有数据输出到控制台;

1.在第1秒时,此时到达的数据为"cat dog"和"dog dog",因此我们可以得到第1秒时的结果集cat=1 dog=3,并输出到控制台;

2.当第2秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执行word count查询并更新结果集,可得第2秒时的结果集为cat=2 dog=3 owl=1,并输出到控制台;

3.当第3秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和"owl",执行word count查询并更新结果集,可得第3秒时的结果集为cat=2 dog=4 owl=2;

这种模型跟其他很多流式计算引擎都不同。大多数流式计算引擎都需要开发人员自己来维护新数据与历史数据的整合并进行聚合操作。然后我们就需要自己去考虑和实现容错机制、数据一致性的语义等。然而在structured streaming的这种模式下,spark会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新result table,不需要我们去考虑这些事情。

二、 Structured Streaming实战

2.1 创建Source

spark 2.0中初步提供了一些内置的source支持。

Socket source (for testing): 从socket连接中读取文本内容。

File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。

Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka。

2.1.1 读取Socket数据

  • 准备工作

集群node01开启监听该节点的9999端口 nc -lk 9999

  • 代码演示
代码语言:javascript复制
object demo01 {
  def main(args: Array[String]): Unit = {

    // 1. 创建SparkSession
    val spark: SparkSession = SparkSession.builder().appName("StructStreamingSocket").master("local[*]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    // 2. 接入/读取最新的数据
    val socketDatasRow: DataFrame = spark.readStream.format("socket")
      .option("host", "node01")
      .option("port", "9999")
      .load()

    // 3. 根据业务进行预处理和 计算
    import spark.implicits._
    val socketDatasString: Dataset[String] = socketDatasRow.as[String]
    // 对数据进行拆分   【第一个原始表】
    val word: Dataset[String] = socketDatasString.flatMap(x=>x.split(" "))
    // 计算单词的数量      DSL  类似于SQL   【第二个表经过计算返回】
    val wordCount: Dataset[Row] = word.groupBy("value").count().sort($"count".desc)

    // 4. 计算结果输出
    wordCount.writeStream.format("console")   // 数据输出到哪里
      .outputMode("complete") // 输出所有的数据
      .trigger(Trigger.ProcessingTime(0))   // 尽快计算
      .start()  // 开启任务
      .awaitTermination()  // 等待关闭

  }
}

启动程序,我们在刚开启的9999端口下的命令行中任意输入一串以空格间隔的字符,例如

hadoop spark sqoop hadoop spark hive hadoop

接着回到IDEA的控制台,就可以发现Structured Streaming已经成功读取了Socket中的信息,并做了一个WordCount计算。

当我再输入一个CSDN

发现很快结果又更新了

看到上面的效果说明我们的Structured Streaming程序读取Socket中的信息并做计算就成功了

2.1.2.读取目录下文本数据

spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据。

Structured Streaming支持的文件类 型有text,csv,json,parquet

  • 准备工作

在people.json文件输入如下数据:

代码语言:javascript复制
{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}

注意:文件必须是被移动到目录中的,且文件名不能有特殊字符

  • 需求

使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜

  • 代码演示
代码语言:javascript复制
object demo02 {

  def main(args: Array[String]): Unit = {

    // 1. 创建SparkSession
    val spark: SparkSession = SparkSession.builder().appName("StructStreamingSocket02").master("local[*]").getOrCreate()

    // 日志级别
    spark.sparkContext.setLogLevel("WARN")

    // 2. 接入/读取最新的数据
    import spark.implicits._

    // 定义数据的结构类型
    val structType: StructType = new StructType()
      .add("name", "string")
      .add("age", "integer")
      .add("hobby", "string")
    
    // 读取数据  [路径只能精确到目录]
    val fileDatas: DataFrame = spark.readStream.schema(structType).json("E:BigData\05-Spark\tmp")

    // 查询JSON文件中的数据,并将过滤出年龄小于25岁的数据,并统计爱好的个数,并排序
    val resultDF: Dataset[Row] = fileDatas.filter($"age"<25).groupBy("hobby").count().sort($"count")

    // 数据输出
    resultDF.writeStream.format("console") // 将结果打印到控制台
      .outputMode("complete")   // 完全输出
      .start()   // 开启任务
      .awaitTermination() //等待关闭

  }
}

运行效果

2.2 计算操作

因为获得到Source之后的基本数据处理方式和之前学习的DataFrame、DataSet一致,所以这里就不再赘述。

2.3. 输出

计算结果可以选择输出到多种设备并进行如下设定

  1. output mode:以哪种方式将result table的数据写入sink
  2. format/output sink的一些细节:数据格式、位置等。
  3. query name:指定查询的标识。类似tempview的名字
  4. trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据
  5. checkpoint地址:一般是hdfs上的目录。注意:Socket不支持数据恢复,如果设置了,第二次启动会报错 ,Kafka支持
2.3.1 output mode

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

这里有三种输出模型:

1.Append mode:输出新增的行,默认模式。每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询select,where,map,flatMap,filter,join等会支持追加模式。不支持聚合 2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。

3.Update mode:输出更新的行,每次更新结果集时,仅将被更新的结果行输出到接收器(自Spark 2.1.1起可用),不支持排序

2.3.2 output sink
  • 使用说明

File sink 输出到路径 支持parquet文件,以及append模式

代码语言:javascript复制
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()

Kafka sink 输出到kafka内的一到多个topic

代码语言:javascript复制
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()

Foreach sink 对输出中的记录运行任意计算。

代码语言:javascript复制
writeStream
    .foreach(...)
    .start()

Console sink (for debugging) 当有触发器时,将输出打印到控制台。

代码语言:javascript复制
writeStream
    .format("console")
    .start()

Memory sink (for debugging) - 输出作为内存表存储在内存中.

代码语言:javascript复制
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
  • 官网示例代码
代码语言:javascript复制
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10") 
  
// Print new data to console
noAggDF.writeStream.format("console").start()

// Write new data to Parquet files
noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF.writeStream.outputMode("complete").format("console").start()

// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start()
spark.sql("select * from aggregates").show()   // interactively query in-memory table

好了,本篇主要讲解的都是基于Structured Streaming的基础理论和简单的实战,下一篇博客博主将带来Structured Streaming整合Kafka和MySQL,敬请期待!!!受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波(^U^)ノ~YO

0 人点赞