Structured Streaming报错记录:Overloaded method foreachBatch with alternatives0. 写在前面1. 报错2. 代码及报错信息3. 原因及纠错4. 参考链接
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
0. 写在前面
- Spark :
Spark3.0.0
- Scala :
Scala2.12
1. 报错
overloaded method value foreachBatch with alternatives:
2. 代码及报错信息
代码语言:javascript复制Error:(48, 12) overloaded method value foreachBatch with alternatives: (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and> (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) .foreachBatch((df, batchId) => {
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ForeachBatchSink1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ForeachSink1")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("update")
.foreachBatch((df, batchId) => {
val result = df.as[String].flatMap(_.split("\W ")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach1")
result.unpersist()
})
// .trigger(Trigger.ProcessingTime(0))
.trigger(Trigger.Continuous(10))
.start
query.awaitTermination()
}
}
/**
- Error:(43, 12) overloaded method value foreachBatch with alternatives:
- (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
- (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
- cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
- .foreachBatch((df, batchId) => {*/
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ForeachBatchSink {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ForeachSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("complete")
.foreachBatch((df, batchId) => {
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach")
result.unpersist()
})
.start
query.awaitTermination()
}
}
3. 原因及纠错
Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样
正确代码如下
import java.util.Properties
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object ForeachBatchSink {
def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
println("BatchId" batchId)
if (df.count() != 0) {
df.persist()
df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
df.unpersist()
}
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachBatchSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // TODO 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val wordCount: DataFrame = lines.as[String]
.flatMap(_.split("\W "))
.groupBy("value")
.count() // value count
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = wordCount.writeStream
.outputMode("complete")
.foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props)
})
.start
query.awaitTermination()
}
}
代码语言:javascript复制 import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object ForeachBatchSink1 {
def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
import spark.implicits._
println("BatchId = " batchId)
if (df.count() != 0) {
val result = df.as[String].flatMap(_.split("\W ")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
result.unpersist()
}
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachBatchSink1")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // TODO 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("update")
.foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props, spark)
})
.trigger(Trigger.Continuous(10))
.start
query.awaitTermination()
}
}
4. 参考链接
https://blog.csdn.net/Shockang/article/details/120961968
小手一点关注,文章提前阅读