前言
Flink 1.9 版本开源了很多 Blink 方面的功能,尤其是在 SQL 方面,这使得我们在开发 Flink 实时任务变得更加方便。目前 Blink SQL 支持了 Create Table 功能,以及维表的功能。我们的实时任务整体流程为,读取Kafka的数据,然后去关联 HBase 维表的数据,最后在输出到 Kafka 中,虽然整体流程跑通,但是其中也遇到了很多坑,这里记录一下,和大家一起分享,避免以后再遇到类似的坑。
Flink SQL Row 类型使用
Flink SQL Row 字段,整体你可以将其理解为一个 Map,Key 为字段的名称,Value 为字段的数据类型。你能够在里面再次定义你的数据类型,比如:
代码语言:javascript复制col Row< name varchar,age int,likes varchar,city varchar>
col1 Row( name varchar,age int,likes varchar,city varchar)
上面定一个两个 Row 字段,一个是 col,一个是col1。你可以使用()或者也可以使用<>,这两个都是等价的。
一般我们会直接 Kafka 里面的数据,Kafka 的数据格式通常是 Json 格式,Json 数据你可以将其理解为一个具有数据模式的数据类型,但有一种情况,就是在 Json 数据中,会存在再次嵌套数据的情况,比如下面这种情况:
代码语言:javascript复制{
"name" : "hello_world",
"info" : {
"city" : "chengdu",
"like" : "火锅",
"play" : "LOL"
}
}
针对上面的 Json 格式里面 info 字段,就是一种镶嵌情况,具体在 Flink SQL DDL 里面进行定义时,可以定义为下面 DDL 语句:
代码语言:javascript复制name varchar,
info Row<city varchar,like varchar,play varchar>
Flink读取 kafka 中的数据,支持 Json 数据嵌套,同时也支持只读取部分字段数据。
Flink SQL DDL user 字段使用
之前在使用 Flink SQL 来读取 Kafka 数据,里面 Json 中有个 user 字段,我在 SQL 语句中定义时,运行时报出 SqlParserException: SQL parse failed.Encountered "user" at line 1,column 4177 异常:
我刚开始以为是我的 SQL 任务代码写错了,毕竟字段太多了,很容易出错。最后在本地测试发现,user 字段是Flink SQL中的关键字,我去掉 user 字段的定义任务就能够运行起来。我个人认为应该是 Flink SQL中由于 user 是关键字,所以在 SQL 任务中不支持,结果报错。当然这是我的个人结论,如果有错,欢迎指出。
HBase 维表字段数据类型映射
我们的实时任务使用到 HBase 作为维表,使用 Flink SQL 直接定义了 HBase 维表的相关配置属性,在使用的时候,报出了 TimeOut 错误,最后发现是因为在 Flink SQL中定义的 HBase 维表的字段类型和在 HBase 数据表中实际存储数据的字段类型没有对应上。
比如在 Flink SQL 中定义的 Age字段为 smallint 类型,但是实际上在 HBase中存储的字段为 String 类型字节数组,String 当然不能转 smallint 类型,然后就报错了。所以在 Flink SQL 中定义HBase维表时,具体使用的字段的数据类型要和 HBase 表具体存储的字段类型保持一致。
Short 变为SmallInt类型
之前在Flink SQL使用中,有个字段在Java类型中是Short类型,然后我再使用Flink SQL定义的时候,也将该字段定义为Short类型,结果在运行的时候,发现报错信息为Flink 不支持自定义类型。最后将Short定义换成SmallInt就正常运行。具体的数据类型使用,可以参考官方文档。