在阅读本文之前,你应该阅读过的系列:
- 《Flink重点难点:时间、窗口和流Join》
- 《Flink重点难点:网络流控和反压》
- 《Flink重点难点:维表关联理论和Join实战》
- 《Flink重点难点:内存模型与内存结构》
- 《Flink重点难点:Flink Table&SQL必知必会(一)》
我们在上半部分
- 《Flink重点难点:Flink Table&SQL必知必会(一)》
介绍了 Flink Table & SQL的一些核心概念,本部分将介绍 Flink 中窗口和函数。
1 常规窗口
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看Table API和SQL中,怎么利用时间字段做窗口操作。
在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows
1.1 分组窗口
分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。
Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。
代码语言:javascript复制val table = input
.window([w: GroupWindow] as $"w") // 定义窗口,别名 w
.groupBy($"w", $"a") // 以属性a和窗口w作为分组的key
.select($"a", $"b".sum) // 聚合字段b的值,求和
或者,还可以把窗口的相关信息,作为字段添加到结果表中:
代码语言:javascript复制val table = input
.window([w: GroupWindow] as $"w")
.groupBy($"w", $"a")
.select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count)
Table API提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作。
Table API支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding)和会话(Session)。
1.2 滚动窗口
滚动窗口(Tumbling windows)要用Tumble类来定义,另外还有三个方法:
- over:定义窗口长度
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
代码如下:
代码语言:javascript复制// Tumbling Event-time Window(事件时间字段rowtime
.window(Tumble over 10.minutes on $"rowtime" as $"w")
// Tumbling Processing-time Window(处理时间字段proctime)
.window(Tumble over 10.minutes on $"proctime" as $"w")
// Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10行一组)
.window(Tumble over 10.rows on $"proctime" as $"w")
1.3 滑动窗口
滑动窗口(Sliding windows)要用Slide类来定义,另外还有四个方法:
- over:定义窗口长度
- every:定义滑动步长
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
代码如下:
代码语言:javascript复制// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
// Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")
// Sliding Row-count window
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")
1.4 会话窗口
会话窗口(Session windows)要用Session类来定义,另外还有三个方法:
- withGap:会话时间间隔
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
代码如下:
代码语言:javascript复制// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
// Session Processing-time Window
.window(Session withGap 10.minutes on $"proctime" as $"w")
2 Over Windows
Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Over windows使用.window(w:overwindows*)子句定义,并在select()方法中通过别名来引用。
比如这样:
代码语言:javascript复制val table = input
.window([w: OverWindow] as $"w")
.select($"a"", $"b".sum over $"w", $"c".min over $"w")
Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。
实际代码应用如下:
- 无界的 over window
// 无界的事件时间over window (时间字段 "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w")
//无界的处理时间over window (时间字段"proctime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_RANGE as $"w")
// 无界的事件时间Row-count over window (时间字段 "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_ROW as $"w")
//无界的处理时间Row-count over window (时间字段 "rowtime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_ROW as $"w")
- 有界的over window
// 有界的事件时间over window (时间字段 "rowtime",之前1分钟)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as $"w")
// 有界的处理时间over window (时间字段 "rowtime",之前1分钟)
.window(Over partitionBy $"a" orderBy $"proctime" preceding 1.minutes as $"w")
// 有界的事件时间Row-count over window (时间字段 "rowtime",之前10行)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 10.rows as $"w")
// 有界的处理时间Row-count over window (时间字段 "rowtime",之前10行)
.window(Over partitionBy $"a" orderBy $"proctime" preceding 10.rows as $"w")
3 SQL中窗口的定义
我们已经了解了在Table API里window的调用方式,同样,我们也可以在SQL中直接加入窗口的定义和使用。
3.1 Group Windows
Group Windows在SQL查询的Group BY子句中定义。与使用常规GROUP BY子句的查询一样,使用GROUP BY子句的查询会计算每个组的单个结果行。
SQL支持以下Group窗口函数:
- TUMBLE(time_attr, interval)
定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度。
- HOP(time_attr, interval, interval)
定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。
- SESSION(time_attr, interval)
定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔(Gap)。
另外还有一些辅助函数,可以用来选择Group Window的开始和结束时间戳,以及时间属性。
这里只写TUMBLE,滑动和会话窗口是类似的(HOP ,SESSION*)。
代码语言:javascript复制TUMBLE_START(time_attr, interval)
TUMBLE_END(time_attr, interval)
TUMBLE_ROWTIME(time_attr, interval)
TUMBLE_PROCTIME(time_attr, interval)
3.2 Over Windows
由于Over本来就是SQL内置支持的语法,所以这在SQL中属于基本的聚合操作。所有聚合必须在同一窗口上定义,也就是说,必须是相同的分区、排序和范围。目前仅支持在当前行范围之前的窗口(无边界和有边界)。
注意,ORDER BY必须在单一的时间属性上指定。
代码如下:
代码语言:javascript复制SELECT COUNT(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
// 也可以做多个聚合
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
4 代码练习(以分组滚动窗口为例)
我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,可以开一个滚动窗口,统计10秒内出现的每个sensor的个数。
代码如下:
scala version
代码语言:javascript复制object TumblingWindowTempCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1"))
val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
// table api
val tableResult = table
.window(Tumble over 10.seconds() on $"pt" as $"w")
.groupBy($"id", $"w") // .keyBy(r => r.id).timeWindow(Time.seconds(10))
.select($"id", $"id".count())
tableEnv.toRetractStream[Row](tableResult).print()
// sql
tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
val sqlResult = tableEnv
.sqlQuery("SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)")
tableEnv.toRetractStream[Row](sqlResult).print()
env.execute()
}
}
Flink Table 和 SQL内置了很多SQL中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。
4 系统内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。SQL中支持的很多函数,Table API和SQL都已经做了实现,其它还在快速开发扩展中。
以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。
- 比较函数
SQL:
value1 = value2
value1 > value2
Table API:
ANY1 === ANY2
ANY1 > ANY2
- 逻辑函数
SQL:
boolean1 OR boolean2
boolean IS FALSE
NOT boolean
Table API:
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
- 算术函数
SQL:
numeric1 numeric2
POWER(numeric1, numeric2)
Table API:
NUMERIC1 NUMERIC2
NUMERIC1.power(NUMERIC2)
- 字符串函数
SQL:
string1 || string2
UPPER(string)
CHAR_LENGTH(string)
Table API:
STRING1 STRING2
STRING.upperCase()
STRING.charLength()
- 时间函数
SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
INTERVAL string range
Table API:
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
NUMERIC.minutes
- 聚合函数
SQL:
COUNT()
SUM([ ALL | DISTINCT ] expression)
RANK()
ROW_NUMBER()
Table API:
FIELD.count
FIELD.sum0
5 UDF
用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现。
5.1 注册用户自定义函数UDF
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的Table API注册函数。
函数通过调用registerFunction()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它。
5.2 标量函数(Scalar Functions)
用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。
在下面的代码中,我们定义自己的HashCode函数,在TableEnvironment中注册它,并在查询中调用它。
代码语言:javascript复制// 自定义一个标量函数
class HashCodeFunction extends ScalarFunction {
private var factor: Int = 0
override def open(context: FunctionContext): Unit = {
// 获取参数 "hashcode_factor"
// 如果不存在,则使用默认值 "12"
factor = context.getJobParameter("hashcode_factor", "12").toInt
}
def eval(s: String): Int = {
s.hashCode * factor
}
}
主函数中调用,计算sensor id的哈希值(前面部分照抄,流环境、表环境、读取source、建表):
代码语言:javascript复制import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
import org.apache.flink.types.Row
object ScalarFunctionExample {
def main(args: Array[String]): Unit = {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.getConfig.addJobParameter("hashcode_factor", "31")
tEnv.createTemporaryView("sensor", stream)
// 在 Table API 里不经注册直接“内联”调用函数
tEnv.from("sensor").select(call(classOf[HashCodeFunction], $"id"))
// sql 写法
// 注册函数
tEnv.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction])
// 在 Table API 里调用注册好的函数
tEnv.from("sensor").select(call("hashCode", $"id"))
tEnv
.sqlQuery("SELECT id, hashCode(id) FROM sensor")
.toAppendStream[Row]
.print()
env.execute()
}
class HashCodeFunction extends ScalarFunction {
private var factor: Int = 0
override def open(context: FunctionContext): Unit = {
// 获取参数 "hashcode_factor"
// 如果不存在,则使用默认值 "12"
factor = context.getJobParameter("hashcode_factor", "12").toInt
}
def eval(s: String): Int = {
s.hashCode * factor
}
}
}
5.3 表函数(Table Functions)
与用户定义的标量函数类似,用户定义的表函数,可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval。求值方法的参数类型,决定表函数的所有有效参数。
返回表的类型由TableFunction的泛型类型确定。求值方法使用protected collect(T)方法发出输出行。
在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。
joinLateral算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。
而leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。
在SQL中,则需要使用Lateral Table(),或者带有ON TRUE条件的左连接。
下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。
自定义TableFunction:
代码语言:javascript复制// 自定义TableFunction
@FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
// use collect(...) to emit a row
str.split("#").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}
完整代码:
代码语言:javascript复制import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
object TableFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.fromElements(
"hello#world",
"atguigu#bigdata"
)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("MyTable", stream, $"s")
// 注册函数
tEnv.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
// 在 Table API 里调用注册好的函数
tEnv
.from("MyTable")
.joinLateral(call("SplitFunction", $"s"))
.select($"s", $"word", $"length")
.toAppendStream[Row]
.print()
tEnv
.from("MyTable")
.leftOuterJoinLateral(call("SplitFunction", $"s"))
.select($"s", $"word", $"length")
// 在 SQL 里调用注册好的函数
tEnv.sqlQuery(
"SELECT s, word, length "
"FROM MyTable, LATERAL TABLE(SplitFunction(s))")
tEnv.sqlQuery(
"SELECT s, word, length "
"FROM MyTable "
"LEFT JOIN LATERAL TABLE(SplitFunction(s)) ON TRUE")
env.execute()
}
@FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
// use collect(...) to emit a row
str.split("#").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}
}
5.4 聚合函数(Aggregate Functions)
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。
上图中显示了一个聚合的例子。
假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。
AggregateFunction的工作原理如下。
- 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。
- 随后,对每个输入行调用函数的accumulate()方法来更新累加器。
- 处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
- createAccumulator()
- accumulate()
- getValue()
除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。
- retract()
- merge()
- resetAccumulator()
接下来我们写一个自定义AggregateFunction,计算一下每个sensor的平均温度值。
代码语言:javascript复制// 定义AggregateFunction的Accumulator
class AvgTempAcc {
var sum: Double = 0.0
var count: Int = 0
}
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
override def getValue(accumulator: AvgTempAcc): Double = accumulator.sum / accumulator.count
override def createAccumulator(): AvgTempAcc = new AvgTempAcc
def accumulate(accumulator: AvgTempAcc, temp: Double): Unit ={
accumulator.sum = temp
accumulator.count = 1
}
}
接下来就可以在代码中调用了。
代码语言:javascript复制// 创建一个聚合函数实例
val avgTemp = new AvgTemp()
// Table API的调用
val resultTable = sensorTable
.groupBy($"id")
.aggregate(avgTemp($"temperature") as $"avgTemp")
.select($"id", $"avgTemp")
// SQL的实现
tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("avgTemp", avgTemp)
val resultSqlTable = tableEnv.sqlQuery(
"""
|SELECT
|id, avgTemp(temperature)
|FROM
|sensor
|GROUP BY id
""".stripMargin)
// 转换成流打印输出
resultTable.toRetractStream[(String, Double)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")
5.5 表聚合函数(Table Aggregate Functions)
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。
比如现在我们需要找到表中所有饮料的前2个最高价格,即执行top2()表聚合。我们需要检查5行中的每一行,得到的结果将是一个具有排序后前2个值的表。
用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。
TableAggregateFunction的工作原理如下。
- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。
- 随后,对每个输入行调用函数的accumulate()方法来更新累加器。
- 处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
- createAccumulator()
- accumulate()
除了上述方法之外,还有一些可选择实现的方法。
- retract()
- merge()
- resetAccumulator()
- emitValue()
- emitUpdateWithRetract()
接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值。
代码语言:javascript复制// 先定义一个 Accumulator
class Top2TempAcc{
var highestTemp: Double = Int.MinValue
var secondHighestTemp: Double = Int.MinValue
}
// 自定义 TableAggregateFunction
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc]{
override def createAccumulator(): Top2TempAcc = new Top2TempAcc
def accumulate(acc: Top2TempAcc, temp: Double): Unit ={
if( temp > acc.highestTemp ){
acc.secondHighestTemp = acc.highestTemp
acc.highestTemp = temp
} else if( temp > acc.secondHighestTemp ){
acc.secondHighestTemp = temp
}
}
def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit ={
out.collect(acc.highestTemp, 1)
out.collect(acc.secondHighestTemp, 2)
}
}
接下来就可以在代码中调用了。
代码语言:javascript复制// 创建一个表聚合函数实例
val top2Temp = new Top2Temp()
// Table API的调用
val resultTable = sensorTable
.groupBy($"id")
.flatAggregate(top2Temp($"temperature") as ($"temp", $"rank"))
.select($"id", $"temp", $"rank")
// 转换成流打印输出
resultTable.toRetractStream[(String, Double, Int)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")
6 Flink SQL集成Hive
Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。
Flink 与 Hive 的集成包含两个层面。
一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。
二是利用 Flink 来读写 Hive 的表。
HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
Maven依赖
主要包含三部分的依赖:flink和hive的连接器,hive的依赖和hadoop的依赖。
代码语言:javascript复制<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
示例程序
先在hive中新建数据库和表
代码语言:javascript复制create database mydb;
use mydb;
create table if not exists t_user(id string, name string);
insert into table t_user values ('1','huangbo'), ('2','xuzheng'),('3','wangbaoqiang');
然后编写程序,将数据流写入到hive中
代码语言:javascript复制import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
object TestHiveStreaming {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val stream = env
.fromElements(
("10", "haha"),
("11", "hehe")
)
val name = "myhive"
val defaultDatabase = "mydb"
val hiveConfDir = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
val version = "3.1.2"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("mydb")
tableEnv.createTemporaryView("users", stream, 'id, 'name)
tableEnv.executeSql("insert into t_user select id, name from users")
tableEnv.executeSql("select * from t_user")
}
}
一个复杂一点的程序
代码语言:javascript复制import java.sql.Timestamp
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
object TestHiveStreaming {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val stream = env.fromElements(
("1", 1000, new Timestamp(1000L)),
("2", 2000, new Timestamp(2000L)),
("3", 3000, new Timestamp(3000L))
)
val name = "myhive"
val defaultDatabase = "mydb"
val hiveConfDir = "/Users/Downloads/apache-hive-3.1.2-bin/conf" // a local path
val version = "3.1.2"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("mydb")
tableEnv.createTemporaryView("users", stream, 'userId, 'amount, 'ts)
val hiveSql = "CREATE external TABLE fs_table (n"
" user_id STRING,n"
" order_amount DOUBLE"
") partitioned by (dt string,h string,m string) "
"stored as ORC "
"TBLPROPERTIES (n"
" 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',n"
" 'sink.partition-commit.delay'='0s',n"
" 'sink.partition-commit.trigger'='partition-time',n"
" 'sink.partition-commit.policy.kind'='metastore'"
")"
tableEnv.executeSql(hiveSql)
val insertSql = "insert into fs_table SELECT userId, amount, "
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users"
tableEnv.executeSql(insertSql)
}
}
彻底重置hadoop和hive的方法
代码语言:javascript复制stop-all.sh
hadoop namenode -format
# 在mysql中删除hive的元数据库
start-all.sh
hadoop fs -mkdir /tmp
hadoop fs -mkdir -p /user/hive/warehouse
hadoop fs -chmod g w /tmp
hadoop fs -chmod g w /user/hive/warehouse
schematool -dbType mysql -initSchema
hive --service metastore
hive