flink写入hudi,开启自动同步
hudi表结构如下:
代码语言:javascript复制CREATE TABLE myhive.test.hudi_users2_m_has_s
(
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://localhost:9000/hudi/hudi_users2_m_has_s',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1',
'write.tasks' = '1',
'read.tasks' = '1',
'compaction.tasks' = '1',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://localhost:9083',
'hive_sync.enable' = 'true',
'hive_sync.table'='hudi_users2_m_has_s',
'hive_sync.db'='test',
'hive_sync.username'='wangkai',
'hive_sync.password'='wangkai',
'hive_sync.jdbc_url'='jdbc:hive2://localhost:10000'
);
出现的异常信息如下:
代码语言:javascript复制2021-10-13 21:50:52,085 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [handle write metadata event for instant 20211013215052] success!
2021-10-13 21:50:52,087 ERROR org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [sync hive metadata for instant 20211013215052] error
java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.metadata.Hive.get(Lorg/apache/hudi/org/apache/hadoop/hive/conf/HiveConf;)Lorg/apache/hadoop/hive/ql/metadata/Hive;
at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:66) ~[hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:75) ~[hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:80) ~[hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:51) ~[hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.syncHive(StreamWriteOperatorCoordinator.java:288) ~[hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:67) ~[hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_291]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_291]
2021-10-13 21:50:52,088 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Received acknowledge message for checkpoint 3 from task ccde8503b1e7f0f717a7c7968616d418 of job a35f0b45de2b2299e51b9fddf507959f at container_1634093221135_0010_01_000002 @ localhost (dataPort=49854).
2021-10-13 21:50:52,088 DEBUG org.apache.hadoop.hdfs.DFSClient [] - /flink-checkpoints/a35f0b45de2b2299e51b9fddf507959f/chk-3/_metadata: masked=rw-r--r--
出现这个问题的原因是运行的环境含有两份hive-exec的代码,
- 来自hudi,hudi对hive的代码进行了部分shade
- flink-sql-hive-connector中也包含hive的代码
解决方式:对出错的hive的代码进行shade
代码语言:javascript复制<relocation>
<pattern>org.apache.hadoop.hive.ql.metadata.</pattern>
<shadedPattern>${flink.bundle.shade.prefix}org.apache.hadoop.hive.ql.metadata.</shadedPattern>
</relocation>