我们书接上文,我们在之前的文章《正面超越Spark | 几大特性垫定Flink1.12流计算领域真正大规模生产可用(上)》详细描述了Flink的生产级别Flink on K8s高可用方案和DataStream API 对批执行模式的支持。
接下来是另外的几个特性增强。
第三个,Flink对SQL操作的全面支持
再很早之前,我在浏览社区的wiki中,关于是否需要添加SQL支持的讨论之前就在Flink社区中发生过几次。Flink自从0.9版本发布之后,Table API、关系表达式的代码生成工具以及运行时的操作符等都预示着添加SQL支持的很多基础已经具备,可以考虑进行添加了。
然后Flink SQL从Blink分支正式合并到了主分支,直到Flink1.12版本持续在进行优化,包括:
- 支持Upsert Kafka Connector
- 支持SQL 中 支持 Temporal Table Join
- Join优化
- 支持UDF等
这些特性使得Flink SQL拥有了不弱于Spark SQL的能力,并且随着生态的进一步完善,类似Flink-CDC这种业务中的常见痛点功能一直在不符按完善。
小编在很早前就已经写过Flink SQL的入门文章,目前该文章在百度搜索中权重排名第一,浏览次数高达4万 。
详细阅读你可以参考这里:https://blog.csdn.net/u013411339/article/details/93267838
第四个,Flink对Hive的全面支持
为什么说与Hive的集成标志着Flink的在生态支持上可以和Spark正面PK了呢?
在传统的基于Hadoop体系的数据仓库建设中,有没有哪个业务不用Hive的呢?几乎没有。
Hive作为Hadoop体系中应用最广泛的数据分析工具在整个生态位置上占有核心位置,是数据仓库生态系统中的绝对核心。一个对Hive不友好的框架是没有资格去争取某一个领域的王者地位的。
目前截止 Flink 1.12,Flink 与 Hive 的集成包含两个层面。
- 一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。
- 二是利用 Flink 来读写 Hive 的表。
HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
关于如何使用Hive Flink打造批流一体数仓仓库,你可以参考这里:《Flink1.12集成Hive打造自己的批流一体数仓》 。
这其中,最值得注意的两个特性是:
Hive Streaming的支持
Flink SQL的FileSystem Connector为了与Flink-Hive集成的大环境适配,做了很多改进,Flink以Streaming的方式写入Hive也标志着批流一体时代的真正来临。我这里贴一个简单的案例:
代码语言:javascript复制public class StreamingWriteHive {
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (n"
" code STRING,"
" total_emp INT ,"
" ts bigint ,"
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),n"
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND "
") WITH ("
" 'connector' = 'kafka',"
" 'topic' = 'flink_dwd_test4',"
" 'properties.bootstrap.servers' = 'node1:9092',"
" 'properties.group.id' = 'test1',"
" 'format' = 'json',"
" 'scan.startup.mode' = 'latest-offset'"
")";
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.enableCheckpointing(5000);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String name = "myhive";
String defaultDatabase = "flink";
String hiveConfDir = ""; // a local path
String version = "1.1.0";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("drop table kafkaTable22");
tEnv.executeSql(KAFKA_SQL);
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
String hiveSql = "CREATE TABLE fs_table (n"
" f_random_str STRING,n"
" f_sequence INT"
") partitioned by (dt string,hr string) "
"stored as PARQUET "
"TBLPROPERTIES (n"
" 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',n"
" 'sink.partition-commit.delay'='5 s',n"
" 'sink.partition-commit.trigger'='partition-time',n"
// " 'sink.partition-commit.delay'='1 m',n"
" 'sink.partition-commit.policy.kind'='metastore,success-file'"
")";
tEnv.executeSql(hiveSql);
String insertSql = "insert into fs_table1111 SELECT code, total_emp, "
" DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM kafkaTable22";
tEnv.executeSql(insertSql).print();
}
对Hive的支持上,有两个特性十分醒目:
- 分区提交 支持不同的触发和策略。详细的介绍读者可以参考:https://blog.csdn.net/u013411339/article/details/113051393
- 小文件合并
很多 bulk format,例如 Parquet,只有当写入的文件比较大时,才比较高效。当 checkpoint 的间隔比较小时,这会成为一个很大的问题,因为会创建大量的小文件。在 Flink 1.12 中,File Sink 增加了小文件合并功能,从而使得即使作业 checkpoint 间隔比较小时,也不会产生大量的文件。要开启小文件合并,可以在 FileSystem connector 中设置 auto-compaction = true 属性。
另外,Flink除了主赛道上的功能完善外,持续向外推展新的能力以解决业务上的痛点问题。值得我们期待。