2021年大数据Spark(四十七):Structured Streaming Sink 输出

2021-10-11 10:22:05 浏览数 (1)


​​​​​​​

Sink 输出

在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下:

文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#starting-streaming-queries

​​​​​​​输出模式

"Output"是用来定义写入外部存储器的内容,输出可以被定义为不同模式:

  •  追加模式(Append mode),默认模式,其中只有自从上一次触发以来,添加到 Result Table 的新行将会是outputted to the sink。只有添加到Result Table的行将永远不会改变那些查询才支持这一点。这种模式保证每行只能输出一次(假设 fault-tolerant sink )。例如,只有select, where, map, flatMap, filter, join等查询支持 Append mode 。只输出那些将来永远不可能再更新的数据,然后数据从内存移除 。没有聚合的时候,append和update一致;有聚合的时候,一定要有水印,才能使用。
  •  完全模式(Complete mode),每次触发后,整个Result Table将被输出到sink,aggregation queries(聚合查询)支持。全部输出,必须有聚合。
  •  更新模式(Update mode),只有 Result Table rows 自上次触发后更新将被输出到 sink。与Complete模式不同,因为该模式只输出自上次触发器以来已经改变的行。如果查询不包含聚合,那么等同于Append模式。只输出更新数据(更新和新增)。

注意,不同查询Query,支持对应的输出模式,如下表所示:

​​​​​​​触发间隔-了解

触发器Trigger决定了多久执行一次查询并输出结果

当不设置时,默认只要有新数据,就立即执行查询Query,再进行输出。

目前来说,支持三种触发间隔设置:

其中Trigger.Processing表示每隔多少时间触发执行一次,此时流式处理依然属于微批处理;从Spark 2.3以后,支持Continue Processing流式处理,设置触发间隔为Trigger.Continuous但不成熟,使用默认的尽可能快的执行即可。

官网代码示例如下:

代码语言:javascript复制
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)

df.writeStream

  .format("console")

  .start()

// ProcessingTime trigger with two-seconds micro-batch interval

df.writeStream

  .format("console")

  .trigger(Trigger.ProcessingTime("2 seconds"))

  .start()

// One-time trigger

df.writeStream

  .format("console")

  .trigger(Trigger.Once())

  .start()

// Continuous trigger with one-second checkpointing interval

df.writeStream

  .format("console")

  .trigger(Trigger.Continuous("1 second"))

  .start()

​​​​​​​查询名称

    可以给每个查询Query设置名称Name,必须是唯一的,直接调用DataFrameWriter中queryName方法即可,实际生产开发建议设置名称,API说明如下:

​​​​​​​检查点位置

     在Structured Streaming中使用Checkpoint 检查点进行故障恢复。如果实时应用发生故障或关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行,使用Checkpoint和预写日志WAL完成。使用检查点位置配置查询,那么查询将所有进度信息(即每个触发器中处理的偏移范围)和运行聚合(例如词频统计wordcount)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,两种方式设置Checkpoint Location位置:

1.DataStreamWrite设置

streamDF.writeStream.option("checkpointLocation", "path")

2.SparkConf设置

sparkConf.set("spark.sql.streaming.checkpointLocation", "path")

修改上述词频统计案例程序,设置输出模式、查询名称、触发间隔及检查点位置,演示代码如下:

代码语言:javascript复制
package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * Author itcast
 */
object SinkDemo {
  def main(args: Array[String]): Unit = {
    //1.准备环境
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    //2.source
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()
    val ds: Dataset[String] = df.as[String]

    //3.operation
    val result: Dataset[Row] = ds.flatMap(_.split(" "))
      .groupBy("value")
      .count()
      .orderBy($"count".desc)

    //4.output
    result.writeStream
      .outputMode(OutputMode.Complete())
      .trigger(Trigger.ProcessingTime(0))
      .format("memory")
      .queryName("t_words")
      .option("numRows", "10")
      .option("truncate", "false")
      .option("checkpointLocation", "./ckp" System.currentTimeMillis())
      .start() //开启查询
    //.awaitTermination()//等待程序结束,注意该行后面的代码没有机会执行,所以如果要在后面继续写代码,需要把改行注掉

    while (true) {
      Thread.sleep(2000)
      println(System.currentTimeMillis())
      spark.sql("select * from t_words").show()
    }
  }
}

运行流式应用,查看Checkpoint Location,包含以下几个目录:

各个子目录及文件含义说明:

 第一、偏移量目录【offsets】:记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。

第二、提交记录目录【commits】:记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次;

 第三、元数据文件【metadata】:metadata 与整个查询关联的元数据,目前仅保留当前job id

 第四、数据源目录【sources】:sources 目录为数据源(Source)时各个批次读取详情

 第五、数据接收端目录【sinks】:sinks 目录为数据接收端(Sink)时批次的写出详情

 第六、记录状态目录【state】:当有状态操作时,如累加聚合、去重、最大最小等场景,这个目录会被用来记录这些状态数据,根据配置周期性地生成.snapshot文件用于记录状态。

0 人点赞