Sources 输入源
从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。
文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources
可以认为Structured Streaming = SparkStreaming SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:
Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;
Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,指定读取Stream数据和保存Streamn数据,具体语法格式:
静态数据
读取spark.read
保存ds/df.write
流式数据
读取spark.readStream
保存ds/df.writeStrem
Socket数据源-入门案例
需求
http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example
实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。
- Socket 数据源
从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数必须指定:
1.host
2.port
- Console 接收器
将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:
1.numRows,打印多少条数据,默认为20条;
2.truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;
编程实现
完整案例代码如下:
代码语言:javascript复制package cn.itcast.structedstreaming
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
*/
object StructuredWordCount {
def main(args: Array[String]): Unit = {
//TODO: 0. 环境
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2") // 设置Shuffle分区数目
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 1. 从TCP Socket 读取数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
//注意:返回的df不是普通的分布式表,而是实时流数据对应的分布式的无界表!
//df.show()//注意:该写法是离线的写法,会报错,所以应使用实时的写法:Queries with streaming sources must be executed with writeStream.start();
inputStreamDF.printSchema()
// TODO: 2. 业务分析:词频统计WordCount
val resultStreamDF: DataFrame = inputStreamDF
.as[String]
.filter(StringUtils.isNotBlank(_))
.flatMap(_.trim.split("\s "))
.groupBy($"value")
.count()
//.orderBy($"count".desc)
resultStreamDF.printSchema()
// TODO: 3. 设置Streaming应用输出及启动
val query: StreamingQuery = resultStreamDF.writeStream
//- append:默认的追加模式,将新的数据输出!只支持简单查询,如果涉及的聚合就不支持了
//- complete:完整模式,将完整的数据输出,支持聚合和排序
//- update:更新模式,将有变化的数据输出,支持聚合但不支持排序,如果没有聚合就和append一样
//.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
// 流式应用,需要启动start
.start()
// 流式查询等待流式应用终止
query.awaitTermination()
// 等待所有任务运行完成才停止运行
query.stop()
}
}
文件数据源-了解
将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet
需求
监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。
测试数据
代码语言:javascript复制jack1;23;running
jack2;23;running
jack3;23;running
bob1;20;swimming
bob2;20;swimming
tom1;28;football
tom2;28;football
tom3;28;football
tom4;28;football
代码实现
代码语言:javascript复制package cn.itcast.structedstreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜
*/
object StructuredFileSource {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 从文件系统,监控目录,读取CSV格式数据
// 数据格式:
// jack;23;running
val csvSchema: StructType = new StructType()
.add("name", StringType, nullable = true)
.add("age", IntegerType, nullable = true)
.add("hobby", StringType, nullable = true)
val inputStreamDF: DataFrame = spark.readStream
.option("sep", ";")
.option("header", "false")
// 指定schema信息
.schema(csvSchema)
.csv("data/input/persons")
// 依据业务需求,分析数据:统计年龄小于25岁的人群的爱好排行榜
val resultStreamDF: Dataset[Row] = inputStreamDF
.filter($"age" < 25)
.groupBy($"hobby")
.count()
.orderBy($"count".desc)
// 设置Streaming应用输出及启动
val query: StreamingQuery = resultStreamDF.writeStream
//- append:默认的追加模式,将新的数据输出!只支持简单查询,如果涉及的聚合就不支持了
//- complete:完整模式,将完整的数据输出,支持聚合和排序
//- update:更新模式,将有变化的数据输出,支持聚合但不支持排序,如果没有聚合就和append一样
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
query.awaitTermination()
query.stop()
}
}
Rate source-了解
以每秒指定的行数生成数据,每个输出行包含2个字段:timestamp和value。
其中timestamp是一个Timestamp含有信息分配的时间类型,并且value是Long(包含消息的计数从0开始作为第一行)类型。此源用于测试和基准测试,可选参数如下:
演示范例代码如下:
代码语言:javascript复制package cn.itcast.structedstreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 数据源:Rate Source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。
*/
object StructuredRateSource {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO:从Rate数据源实时消费数据
val rateStreamDF: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "10") // 每秒生成数据条数
.option("rampUpTime", "0s") // 每条数据生成间隔时间
.option("numPartitions", "2") // 分区数目
.load()
rateStreamDF.printSchema()
//root
// |-- timestamp: timestamp (nullable = true)
// |-- value: long (nullable = true)
// 3. 设置Streaming应用输出及启动
val query: StreamingQuery = rateStreamDF.writeStream
//- append:默认的追加模式,将新的数据输出!只支持简单查询,如果涉及的聚合就不支持了
//- complete:完整模式,将完整的数据输出,支持聚合和排序
//- update:更新模式,将有变化的数据输出,支持聚合但不支持排序,如果没有聚合就和append一样
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
query.awaitTermination()
query.stop()
}
}