Part1 实时数据使用Structured Streaming的ETL操作
1.1 Introduction
在大数据时代中我们迫切需要实时应用解决源源不断涌入的数据,然而建立这么一个应用需要解决多个问题:
- 提供端到端的可靠性与正确性
- 执行复杂转换(JSON, CSV, etc.)
- 解决乱序数据
- 与其他系统整合(Kafka, HDFS, etc.)
Structured Streaming以Spark SQL 为基础, 建立在上述基础之上,借用其强力API提供无缝的查询接口,同时最优化的执行低延迟持续的更新结果。
1.2 流数据ETL操作的需要
ETL: Extract, Transform, and Load
ETL操作可将非结构化数据转化为可以高效查询的Table。具体而言需要可以执行以下操作:
- 过滤,转换和清理数据
- 转化为更高效的存储格式,如JSON(易于阅读)转换为Parquet(查询高效)
- 数据按重要列来分区(更高效查询)
传统上,ETL定期执行批处理任务。例如实时转储原始数据,然后每隔几小时将其转换为结构化表格,以实现高效查询,但高延迟非常高。在许多情况下这种延迟是不可接受的。
幸运的是,Structured Streaming 可轻松将这些定期批处理任务转换为实时数据。此外,该引擎提供保证与定期批处理作业相同的容错和数据一致性,同时提供更低的端到端延迟。
1.3 使用Structured Streaming转换未处理Logs
代码语言:txt复制val cloudTrailSchema = new StructType()
.add("Records", ArrayType(new StructType()
.add("additionalEventData", StringType)
.add("apiVersion", StringType)
.add("awsRegion", StringType)
// ...
代码语言:txt复制val rawRecords = spark.readStream
.schema(cloudTrailSchema)
.json("s3n://mybucket/AWSLogs/*/CloudTrail/*/2017/*/*")
这里的rawRecords
为Dataframe,可理解为无限表格
转化为Dataframe我们可以很方便地使用Spark SQL查询一些复杂的结构
代码语言:txt复制val cloudtrailEvents = rawRecords
.select(explode($"records") as 'record)
.select(
unix_timestamp(
$"record.eventTime",
"yyyy-MM-dd'T'hh:mm:ss").cast("timestamp") as 'timestamp, $"record.*")
代码语言:txt复制val streamingETLQuery = cloudtrailEvents
.withColumn("date", $"timestamp".cast("date") // derive the date
.writeStream
.trigger(ProcessingTime("10 seconds")) // check for files every 10s
.format("parquet") // write as Parquet partitioned by date
.partitionBy("date")
.option("path", "/cloudtrail")
.option("checkpointLocation", "/cloudtrail.checkpoint/")
.start()
StreamingQuery将会连续运行,当新数据到达时并会对其进行转换
这里我们为StreamingQuery指定以下配置:
- 从时间戳列中导出日期
- 每10秒检查一次新文件(即触发间隔)
- 将解析后的DataFrame中的转换数据写为/cloudtrail上的Parquet格式表
- 按日期对Parquet表进行分区,以便我们以后可以有效地查询数据的时间片
- 在路径/检查点/ cloudtrail上保存检查点信息以获得容错性
option(“checkpointLocation”,“/ cloudtrail.checkpoint /”)
当查询处于活动状态时,Spark会不断将已处理数据的元数据写入检查点目录。即使整个群集出现故障,也可以使用相同的检查点目录在新群集上重新启动查询,并进行恢复。更具体地说,在新集群上,Spark使用元数据来启动新查询,从而确保端到端一次性和数据一致性。
part 2 Working with Complex Data Formats with Structured Streaming
此部分具体将讨论以下内容:
- 有哪些不同的数据格式及其权衡
- 如何使用Spark SQL轻松使用它们
- 如何为用例选择正确的最终格式
2.1 数据源与格式
结构化数据
结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使从列的子集中提取值变得更加容易。基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。如因结构的固定性,格式转变可能相对困难。
非结构化数据
相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据以定义数据的结构。报纸文章,医疗记录,图像,应用程序日志通常被视为非结构化数据。这些类型的源通常要求数据周围的上下文是可解析的。
半结构化数据
半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。每个数据记录都使用其结构信息进行扩充。 半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。但这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc(特定)查询而构建的。
2.2 Spark SQL转数据格式
Spark SQL支持以Parquet,ORC,JSON,CSV和文本格式读取和写入数据,并且Spark包中还存在大量其他连接器,还可以使用JDBC DataSource连接到SQL数据库。
转数据格式如下所示:
代码语言:txt复制events = spark.readStream
.format("json") # or parquet, kafka, orc...
.option() # format specific options
.schema(my_schema) # required
.load("path/to/data")
output = … # perform your transformations
output.writeStream # write out your data
.format("parquet")
.start("path/to/write")
2.3 转换复杂数据类型
例如:
嵌套所有列: 星号(*)可用于包含嵌套结构中的所有列。
代码语言:txt复制// input
{
"a": 1,
"b": 2
}
Python: events.select(struct("*").alias("x"))
Scala: events.select(struct("*") as 'x)
SQL: select struct(*) as x from events
// output
{
"x": {
"a": 1,
"b": 2
}
}
Spark SQL提供from_json()
及to_json()
函数
// input
{
"a": "{"b":1}"
}
Python:
schema = StructType().add("b", IntegerType())
events.select(from_json("a", schema).alias("c"))
Scala:
val schema = new StructType().add("b", IntegerType)
events.select(from_json('a, schema) as 'c)
// output
{
"c": {
"b": 1
}
}
regexp_extract()
解析正则表达式
// input
[{ "a": "x: 1" }, { "a": "y: 2" }]
Python: events.select(regexp_extract("a", "([a-z]):", 1).alias("c"))
Scala: events.select(regexp_extract('a, "([a-z]):", 1) as 'c)
SQL: select regexp_extract(a, "([a-z]):", 1) as c from events
// output
[{ "c": "x" }, { "c": "y" }]
part 3 Processing Data in Apache Kafka with Structured Streaming
此部分将讨论使用Spark SQL API处理转换来自Kafka的复杂数据流,并存储到HDFS MySQL等系统中。
3.1 Kafka简述
Kafka是一种分布式pub-sub消息传递系统,广泛用于摄取实时数据流,并以并行和容错的方式向下游消费者提供。这使得Kafka适合构建可在异构处理系统之间可靠地移动数据的实时流数据流水线。
Kafka中的数据被分为并行分区的主题。每个分区都是有序且不可变的记录序列。Producer将记录附加到这些序列的尾部,Consumer按照自己需要阅读序列。多个消费者可以订阅主题并在数据到达时接收数据。当新数据到达Kafka主题中的分区时,会为它们分配一个称为偏移的顺序ID号。 Kafka群集保留所有已发布的数据无论它们是否已被消耗。在可配置的保留期内,之后它们被标记为删除。
我们有三种不同startingOffsets选项读取数据:
- earliest - 在流的开头开始阅读(不包括已从Kafka中删除的数据)
- latest - 从现在开始,仅处理查询开始后到达的新数据
- 分区指定 - 指定从每个分区开始的精确偏移量,允许精确控制处理应该从哪里开始。例如,如果我们想要准确地获取某些其他系统或查询中断的位置,则可以利用此选项
3.2 Structured Streaming 对Kafka支持
从Kafka中读取数据,并将二进制流数据转为字符串:
代码语言:txt复制# Construct a streaming DataFrame that reads from topic1
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
使用Spark作为Producer发送Kafka数据:
代码语言:txt复制# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df
.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.option("checkpointLocation", "/path/to/HDFS/dir")
.start()
3.3 一个端到端的例子
此例子使用一个Nest摄像头,收集的数据通过Kafka发送至Spark做相应计算,下面是Nest发送的JSON数据格式:
代码语言:txt复制"devices": {
"cameras": {
"device_id": "awJo6rH...",
"last_event": {
"has_sound": true,
"has_motion": true,
"has_person": true,
"start_time": "2016-12-29T00:00:00.000Z",
"end_time": "2016-12-29T18:42:00.000Z"
}
}
}
我们的目标:
- 使用类似Parquet这样的柱状格式创建所有事件的高效且可查询的历史存档
- 执行低延迟事件时间聚合,并将结果推送回Kafka以供其他消费者使用
- 对Kafka中主题中存储的批量数据执行汇报
3.3.1 第一步
我们使用from_json
函数读取并解析从Nest摄像头发来的数据
schema = StructType()
.add("metadata", StructType()
.add("access_token", StringType())
.add("client_version", IntegerType()))
.add("devices", StructType()
.add("thermostats", MapType(StringType(), StructType().add(...)))
.add("smoke_co_alarms", MapType(StringType(), StructType().add(...)))
.add("cameras", MapType(StringType(), StructType().add(...)))
.add("companyName", StructType().add(...)))
.add("structures", MapType(StringType(), StructType().add(...)))
nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
代码语言:txt复制jsonOptions = { "timestampFormat": nestTimestampFormat }
parsed = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "nest-logs")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
我们使用explode()
函数为每个键值对创建一个新行,展平数据
camera = parsed
.select(explode("parsed_value.devices.cameras"))
.select("value.*")
sightings = camera
.select("device_id", "last_event.has_person", "last_event.start_time")
.where(col("has_person") == True)
可以使用printSchema()
方法查看camera DataSet的结构:
camera.printSchema()
代码语言:txt复制root
代码语言:txt复制 |-- device_id: string (nullable = true)
代码语言:txt复制 |-- software_version: string (nullable = true)
代码语言:txt复制 |-- structure_id: string (nullable = true)
代码语言:txt复制 |-- where_id: string (nullable = true)
代码语言:txt复制 |-- where_name: string (nullable = true)
代码语言:txt复制 |-- name: string (nullable = true)
代码语言:txt复制 |-- name_long: string (nullable = true)
代码语言:txt复制 |-- is_online: boolean (nullable = true)
代码语言:txt复制 |-- is_streaming: boolean (nullable = true)
代码语言:txt复制 |-- is_audio_input_enable: boolean (nullable = true)
代码语言:txt复制 |-- last_is_online_change: timestamp (nullable = true)
代码语言:txt复制 |-- is_video_history_enabled: boolean (nullable = true)
代码语言:txt复制 |-- web_url: string (nullable = true)
代码语言:txt复制 |-- app_url: string (nullable = true)
代码语言:txt复制 |-- is_public_share_enabled: boolean (nullable = true)
代码语言:txt复制 |-- activity_zones: array (nullable = true)
代码语言:txt复制 | |-- element: struct (containsNull = true)
代码语言:txt复制 | | |-- name: string (nullable = true)
代码语言:txt复制 | | |-- id: string (nullable = true)
代码语言:txt复制 |-- public_share_url: string (nullable = true)
代码语言:txt复制 |-- snapshot_url: string (nullable = true)
代码语言:txt复制 |-- last_event: struct (nullable = true)
代码语言:txt复制 | |-- has_sound: boolean (nullable = true)
代码语言:txt复制 | |-- has_motion: boolean (nullable = true)
代码语言:txt复制 | |-- has_person: boolean (nullable = true)
代码语言:txt复制 | |-- start_time: timestamp (nullable = true)
代码语言:txt复制 | |-- end_time: timestamp (nullable = true)
代码语言:txt复制 | |-- urls_expire_time: timestamp (nullable = true)
代码语言:txt复制 | |-- web_url: string (nullable = true)
代码语言:txt复制 | |-- app_url: string (nullable = true)
代码语言:txt复制 | |-- image_url: string (nullable = true)
代码语言:txt复制 | |-- animated_image_url: string (nullable = true)
代码语言:txt复制 | |-- activity_zone_ids: array (nullable = true)
代码语言:txt复制 | | |-- element: string (containsNull = true)
3.3.2 聚合数据并发送Kafka
我们首先创建一个表示此位置数据的DataFrame,然后将其与目标DataFrame连接,并在设备ID上进行匹配。我们在这里做的是将流式DataFrame目标加入静态DataFrame位置:
代码语言:txt复制locationDF = spark.table("device_locations").select("device_id", "zip_code")
sightingLoc = sightings.join(locationDF, "device_id")
生成一个流式聚合,计算每小时每个邮政编码中的摄像头人数,然后将其写入Kafka topic1,称为“nest-camera-stats”
代码语言:txt复制sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "nest-camera-stats")
.option("checkpointLocation", "/path/to/HDFS/dir")
.outputMode("complete")
.start()
聚合统计数据并写入Kafka:
代码语言:txt复制sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "nest-camera-stats")
.option("checkpointLocation", "/path/to/HDFS/dir")
.outputMode("complete")
.start()
3.3.3 将结果存入数据库
代码语言:txt复制camera.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("path", "s3://nest-logs")
.option("checkpointLocation", "/path/to/HDFS/dir")
.start()
PS:我们可以使用相同的Dataframe做多个流查询(streaming queries)
3.3.4 批量查询并汇报
这里直接使用read
方法去做批量查询,用法与readStream
类似
report = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "nest-camera-stats")
.load()
.select(
json_tuple(col("key").cast("string"), "zip_code", "window").alias("zip_code", "window"),
col("value").cast("string").cast("integer").alias("count"))
.where("count > 1000")
.select("zip_code", "window")
.distinct()