2021年大数据Spark(四十八):Structured Streaming 输出终端/位置

2021-10-11 10:21:49 浏览数 (1)


输出终端/位置

Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的 end-to-end exactly-once guarantees

目前Structured Streaming内置FileSink、Console Sink、Foreach Sink(ForeachBatch Sink)、Memory Sink及Kafka Sink,其中测试最为方便的是Console Sink。

​​​​​​​文件接收器

将输出存储到目录文件中,支持文件格式:parquet、orc、json、csv等,示例如下:

相关注意事项如下:

  •  支持OutputMode为:Append追加模式;
  •  必须指定输出目录参数【path】,必选参数,其中格式有parquet、orc、json、csv等等;
  •  容灾恢复支持精确一次性语义exactly-once;
  •  此外支持写入分区表,实际项目中常常按时间划分;

​​​​​​​Memory Sink

此种接收器作为调试使用,输出作为内存表存储在内存中, 支持Append和Complete输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中,因此,请谨慎使用,示例如下:

Foreach和ForeachBatch Sink

Foreach

     Structured Streaming提供接口foreach和foreachBatch,允许用户在流式查询的输出上应用任意操作和编写逻辑,比如输出到MySQL表、Redis数据库等外部存系统。其中foreach允许每行自定义写入逻辑,foreachBatch允许在每个微批量的输出上进行任意操作和自定义逻辑,建议使用foreachBatch操作。

foreach表达自定义编写器逻辑具体来说,需要编写类class继承ForeachWriter,其中包含三个方法来表达数据写入逻辑:打开,处理和关闭。

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

代码语言:javascript复制
streamingDatasetOfString.writeStream.foreach(

  new ForeachWriter[String] {



    def open(partitionId: Long, version: Long): Boolean = {

      // Open connection

    }



    def process(record: String): Unit = {

      // Write string to connection

    }



    def close(errorOrNull: Throwable): Unit = {

      // Close the connection

    }

  }

).start()

​​​​​​​ForeachBatch

方法foreachBatch允许指定在流式查询的每个微批次的输出数据上执行的函数,需要两个参数:微批次的输出数据DataFrame或Dataset、微批次的唯一ID。

使用foreachBatch函数输出时,以下几个注意事项:

1.重用现有的批处理数据源,可以在每个微批次的输出上使用批处理数据输出Output;

2.写入多个位置,如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出 DataFrame/Dataset 。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。要避免重新计算,您应该缓存cache输出 DataFrame/Dataset,将其写入多个位置,然后 uncache 。

3.应用其他DataFrame操作,流式DataFrame中不支持许多DataFrame和Dataset操作,使用foreachBatch可以在每个微批输出上应用其中一些操作,但是,必须自己解释执行该操作的端到端语义。

4.默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。

5.foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。

​​​​​​​代码演示

使用foreachBatch将词频统计结果输出到MySQL表中,代码如下:

代码语言:javascript复制
package cn.itcast.structedstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果存储到MySQL数据库表中
 */
object StructuredForeachBatch {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val inputStreamDF: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    val resultStreamDF: DataFrame = inputStreamDF
      .as[String]
      .filter(StringUtils.isNotBlank(_))
      .flatMap(_.trim.split("\s "))
      .groupBy($"value")
      .count()

    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .foreachBatch((batchDF: DataFrame, batchId: Long) => {
        println(s"BatchId = ${batchId}")
        if (!batchDF.isEmpty) {
          batchDF.coalesce(1)
            .write
            .mode(SaveMode.Overwrite)
            .format("jdbc")
            //.option("driver", "com.mysql.cj.jdbc.Driver")//MySQL-8
            //.option("url", "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")//MySQL-8
            .option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8")
            .option("user", "root")
            .option("password", "root")
            .option("dbtable", "bigdata.t_struct_words")
            .save()
        }
      }).start()
    query.awaitTermination()
    query.stop()
  }
}

0 人点赞