FlinkCDC/DEBEZIUM自定义日期转换器

2023-02-13 14:24:22 浏览数 (2)

Flinkcdc研究

最近在研究Flinkcdc数据采集,底层技术为debezium,debezium会将日期转为5位数字,日期时间位13位的数字,看之前代码解决办法是: 1.识别十三位数字进行转换为日期格式。 2.对于date类型,人工穷举字段类型进行转换

我自己考虑了一下,这样可能会误转换、增大人工成本。感觉这样不是解决办法,就通过查看官网文档、看一些源码,看看是否有其他解决方法。

debezium自定义转换器

经过阅读debezium的官方文档,发现其是支持自定义转换器,因此可以通过自定义转换器时间事件类型的转换。 1.避免造轮子 经过搜索学习,发现github上有大佬已经写过针对mysql的时间点我直达转换器。 2.不得不造轮子 因为我日常参与的数据不仅是mysql、还有sqlserver。查看sqlserver并没有相关代码,就想着自己也写一个,能够兼容mysql和sqlserver的转换器(后来证明还是分开写比较好)。 3.学习分析 对大佬代码学习及jdbc源码查看,并且在实际的测试中。发现mysql、sqlserver的字段类型在快照、binlog(cdc)期间并不是完全一样。若要完全兼容,必须将映射类型找出来。 刚开始认为一个jdbc type只会对应一种java类。其实不是,而是在不同的jdbc中都有不同的映射。(通过chatgpt证实了一下)

mysql转换

mysql启动时,快照期间初始化转换器,在binlog期间仍进行一次初始化转换器。(使用的类不同)

字段类型

快照类型(jdbc type)

binlog类型(jdbc type)

DATE

java.time.LocalDate(93)

java.time.LocalDate(91)

TIME

java.time.Duration(92)

java.time.Duration(92)

DATETIME

java.sql.Timestamp(93)

java.time.LocalDateTime(93)

TIMESTAMP

java.sql.Timestamp(2014)

java.time.ZonedDateTime(2014)

sqlserver转换

sqlserver启动时 快照期间初始化转换器,在cdc期间不再进行初始化转换器。(使用的类相同)

timestamp类型在sqlserver中为byte[]类型,jdbc type为-2,因此不进行转换。

字段类型

快照类型(jdbc type)

cdc类型(jdbc type)

DATE

java.sql.Date(91)

java.sql.Date(91)

TIME

java.sql.Timestamp(92)

java.sql.Time(92)

DATETIME

java.sql.Timestamp(93)

java.sql.Timestamp(93)

DATETIME2

java.sql.Timestamp(93)

java.sql.Timestamp(93)

DATETIMEOFFSET

microsoft.sql.DateTimeOffset(-155)

microsoft.sql.DateTimeOffset(-155)

SMALLDATETIME

java.sql.Timestamp(93)

java.sql.Timestamp(93)

开始写代码喽

java并不是我的擅长( 在学了 ),写代码期间借助了大量人工智能GitHub coplit,帮我实现。 刚开始涉及思路:大统一是全形式,将mysql和sqlserver都写到一个方法中去。

实际中发现并不是很合理。例如mysql的TIMESTAMP类型是时间戳,但sqlserver的TIMESTAMP是byte[]类型,还要在另外判断一下jdbc type是否为-2,也容易产生误解。最终决定分开写。

最后依照官网的模板重写方法就可以了。代码地址点击直达

使用方法

converters参数为:自定义转换器的名字,可以随意设置。设置的值就作为转换器的名字,在以后的参数中就要使用这个名字。 假设自定义的名字为mydebeziumconverter,则type参数为mydebeziumconverter.type

mydebeziumconverter.type参数为:自定义转换器的类名,必须设置。(转换器的方法) mydebeziumconverter.database.type参数为:数据库类型,必须设置。(需要设置为mysql或sqlserver) mydebeziumconverter.format.datetime参数为:datetime类型的格式,可选。 mydebeziumconverter.format.date参数为:date类型的格式,可选。 mydebeziumconverter.format.time参数为:time类型的格式,可选。

如果仅使用mysql或sqlserver建议独立编译代码,只保留mysql或sqlserver的转换器,减少依赖。

flinkcdc

可使用源代码也可使用编译好的jar包。只需要放入目录即可。并在配置中设置参数。

代码语言:javascript复制
// 自定义解释器
// 设置解释器及解释器参数
debeziumProperties.put("converters", "mydebeziumconverter");
debeziumProperties.put("mydebeziumconverter.type", "org.util.MyDebeziumConverter");
debeziumProperties.put("mydebeziumconverter.database.type", "mysql");
// 自定义格式,可选
debeziumProperties.put("mydebeziumconverter.format.datetime", "yyyy-MM-dd HH:mm:ss");
debeziumProperties.put("mydebeziumconverter.format.date", "yyyy-MM-dd");
debeziumProperties.put("mydebeziumconverter.format.time", "HH:mm:ss");

debezium

使用jar包,并将其放在 debezium 插件的同一级别目录中。并在配置文件中设置参数。

代码语言:javascript复制
"converters": "mydebeziumconverter",
"mydebeziumconverter.type": "org.util.MyDebeziumConverter",
"mydebeziumconverter.database.type": "mysql",
"mydebeziumconverter.format.datetime": "yyyy-MM-dd HH:mm:ss",
"mydebeziumconverter.format.date": "yyyy-MM-dd",
"mydebeziumconverter.format.time": "HH:mm:ss"

github

代码已发布到github,点击直达

0 人点赞