输出终端/位置
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的 end-to-end exactly-once guarantees。
目前Structured Streaming内置FileSink、Console Sink、Foreach Sink(ForeachBatch Sink)、Memory Sink及Kafka Sink,其中测试最为方便的是Console Sink。
文件接收器
将输出存储到目录文件中,支持文件格式:parquet、orc、json、csv等,示例如下:
相关注意事项如下:
- 支持OutputMode为:Append追加模式;
- 必须指定输出目录参数【path】,必选参数,其中格式有parquet、orc、json、csv等等;
- 容灾恢复支持精确一次性语义exactly-once;
- 此外支持写入分区表,实际项目中常常按时间划分;
Memory Sink
此种接收器作为调试使用,输出作为内存表存储在内存中, 支持Append和Complete输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中,因此,请谨慎使用,示例如下:
Foreach和ForeachBatch Sink
Foreach
Structured Streaming提供接口foreach和foreachBatch,允许用户在流式查询的输出上应用任意操作和编写逻辑,比如输出到MySQL表、Redis数据库等外部存系统。其中foreach允许每行自定义写入逻辑,foreachBatch允许在每个微批量的输出上进行任意操作和自定义逻辑,建议使用foreachBatch操作。
foreach表达自定义编写器逻辑具体来说,需要编写类class继承ForeachWriter,其中包含三个方法来表达数据写入逻辑:打开,处理和关闭。
https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html
代码语言:javascript复制streamingDatasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String): Unit = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()
ForeachBatch
方法foreachBatch允许指定在流式查询的每个微批次的输出数据上执行的函数,需要两个参数:微批次的输出数据DataFrame或Dataset、微批次的唯一ID。
使用foreachBatch函数输出时,以下几个注意事项:
1.重用现有的批处理数据源,可以在每个微批次的输出上使用批处理数据输出Output;
2.写入多个位置,如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出 DataFrame/Dataset 。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。要避免重新计算,您应该缓存cache输出 DataFrame/Dataset,将其写入多个位置,然后 uncache 。
3.应用其他DataFrame操作,流式DataFrame中不支持许多DataFrame和Dataset操作,使用foreachBatch可以在每个微批输出上应用其中一些操作,但是,必须自己解释执行该操作的端到端语义。
4.默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。
5.foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。
代码演示
使用foreachBatch将词频统计结果输出到MySQL表中,代码如下:
代码语言:javascript复制package cn.itcast.structedstreaming
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果存储到MySQL数据库表中
*/
object StructuredForeachBatch {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
val resultStreamDF: DataFrame = inputStreamDF
.as[String]
.filter(StringUtils.isNotBlank(_))
.flatMap(_.trim.split("\s "))
.groupBy($"value")
.count()
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
println(s"BatchId = ${batchId}")
if (!batchDF.isEmpty) {
batchDF.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.format("jdbc")
//.option("driver", "com.mysql.cj.jdbc.Driver")//MySQL-8
//.option("url", "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")//MySQL-8
.option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "root")
.option("dbtable", "bigdata.t_struct_words")
.save()
}
}).start()
query.awaitTermination()
query.stop()
}
}