Sink 输出
在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下:
文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#starting-streaming-queries
输出模式
"Output"是用来定义写入外部存储器的内容,输出可以被定义为不同模式:
- 追加模式(Append mode),默认模式,其中只有自从上一次触发以来,添加到 Result Table 的新行将会是outputted to the sink。只有添加到Result Table的行将永远不会改变那些查询才支持这一点。这种模式保证每行只能输出一次(假设 fault-tolerant sink )。例如,只有select, where, map, flatMap, filter, join等查询支持 Append mode 。只输出那些将来永远不可能再更新的数据,然后数据从内存移除 。没有聚合的时候,append和update一致;有聚合的时候,一定要有水印,才能使用。
- 完全模式(Complete mode),每次触发后,整个Result Table将被输出到sink,aggregation queries(聚合查询)支持。全部输出,必须有聚合。
- 更新模式(Update mode),只有 Result Table rows 自上次触发后更新将被输出到 sink。与Complete模式不同,因为该模式只输出自上次触发器以来已经改变的行。如果查询不包含聚合,那么等同于Append模式。只输出更新数据(更新和新增)。
注意,不同查询Query,支持对应的输出模式,如下表所示:
触发间隔-了解
触发器Trigger决定了多久执行一次查询并输出结果
当不设置时,默认只要有新数据,就立即执行查询Query,再进行输出。
目前来说,支持三种触发间隔设置:
其中Trigger.Processing表示每隔多少时间触发执行一次,此时流式处理依然属于微批处理;从Spark 2.3以后,支持Continue Processing流式处理,设置触发间隔为Trigger.Continuous但不成熟,使用默认的尽可能快的执行即可。
官网代码示例如下:
代码语言:javascript复制import org.apache.spark.sql.streaming.Trigger
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start()
// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// One-time trigger
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
查询名称
可以给每个查询Query设置名称Name,必须是唯一的,直接调用DataFrameWriter中queryName方法即可,实际生产开发建议设置名称,API说明如下:
检查点位置
在Structured Streaming中使用Checkpoint 检查点进行故障恢复。如果实时应用发生故障或关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行,使用Checkpoint和预写日志WAL完成。使用检查点位置配置查询,那么查询将所有进度信息(即每个触发器中处理的偏移范围)和运行聚合(例如词频统计wordcount)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,两种方式设置Checkpoint Location位置:
1.DataStreamWrite设置
streamDF.writeStream.option("checkpointLocation", "path")
2.SparkConf设置
sparkConf.set("spark.sql.streaming.checkpointLocation", "path")
修改上述词频统计案例程序,设置输出模式、查询名称、触发间隔及检查点位置,演示代码如下:
代码语言:javascript复制package cn.itcast.structedstreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* Author itcast
*/
object SinkDemo {
def main(args: Array[String]): Unit = {
//1.准备环境
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
//2.source
val df: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
val ds: Dataset[String] = df.as[String]
//3.operation
val result: Dataset[Row] = ds.flatMap(_.split(" "))
.groupBy("value")
.count()
.orderBy($"count".desc)
//4.output
result.writeStream
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(0))
.format("memory")
.queryName("t_words")
.option("numRows", "10")
.option("truncate", "false")
.option("checkpointLocation", "./ckp" System.currentTimeMillis())
.start() //开启查询
//.awaitTermination()//等待程序结束,注意该行后面的代码没有机会执行,所以如果要在后面继续写代码,需要把改行注掉
while (true) {
Thread.sleep(2000)
println(System.currentTimeMillis())
spark.sql("select * from t_words").show()
}
}
}
运行流式应用,查看Checkpoint Location,包含以下几个目录:
各个子目录及文件含义说明:
第一、偏移量目录【offsets】:记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。
第二、提交记录目录【commits】:记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次;
第三、元数据文件【metadata】:metadata 与整个查询关联的元数据,目前仅保留当前job id
第四、数据源目录【sources】:sources 目录为数据源(Source)时各个批次读取详情
第五、数据接收端目录【sinks】:sinks 目录为数据接收端(Sink)时批次的写出详情
第六、记录状态目录【state】:当有状态操作时,如累加聚合、去重、最大最小等场景,这个目录会被用来记录这些状态数据,根据配置周期性地生成.snapshot文件用于记录状态。