2021年大数据Spark(四十六):Structured Streaming Operations 操作

2021-10-11 10:22:19 浏览数 (3)


Operations 操作

获得到Source之后的基本数据处理方式和之前学习的DataFrame、DataSet一致,不再赘述

官网示例代码:

代码语言:javascript复制
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }

val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10

df.select("device").where("signal > 10")      // using untyped APIs   

ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type

df.groupBy("deviceType").count()                 // using untyped API

// Running average signal for each device type

import org.apache.spark.sql.expressions.scalalang.typed

ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

0 人点赞