Flinkcdc研究
最近在研究Flinkcdc数据采集,底层技术为debezium,debezium会将日期转为5位数字,日期时间位13位的数字,看之前代码解决办法是: 1.识别十三位数字进行转换为日期格式。 2.对于date类型,人工穷举字段类型进行转换
我自己考虑了一下,这样可能会误转换、增大人工成本。感觉这样不是解决办法,就通过查看官网文档、看一些源码,看看是否有其他解决方法。
debezium自定义转换器
经过阅读debezium的官方文档,发现其是支持自定义转换器,因此可以通过自定义转换器时间事件类型的转换。 1.避免造轮子 经过搜索学习,发现github上有大佬已经写过针对mysql的时间点我直达转换器。 2.不得不造轮子 因为我日常参与的数据不仅是mysql、还有sqlserver。查看sqlserver并没有相关代码,就想着自己也写一个,能够兼容mysql和sqlserver的转换器(后来证明还是分开写比较好)。 3.学习分析 对大佬代码学习及jdbc源码查看,并且在实际的测试中。发现mysql、sqlserver的字段类型在快照、binlog(cdc)期间并不是完全一样。若要完全兼容,必须将映射类型找出来。 刚开始认为一个jdbc type只会对应一种java类。其实不是,而是在不同的jdbc中都有不同的映射。(通过chatgpt证实了一下)
mysql转换
mysql启动时,快照期间初始化转换器,在binlog期间仍进行一次初始化转换器。(使用的类不同)
字段类型 | 快照类型(jdbc type) | binlog类型(jdbc type) |
---|---|---|
DATE | java.time.LocalDate(93) | java.time.LocalDate(91) |
TIME | java.time.Duration(92) | java.time.Duration(92) |
DATETIME | java.sql.Timestamp(93) | java.time.LocalDateTime(93) |
TIMESTAMP | java.sql.Timestamp(2014) | java.time.ZonedDateTime(2014) |
sqlserver转换
sqlserver启动时 快照期间初始化转换器,在cdc期间不再进行初始化转换器。(使用的类相同)
timestamp类型在sqlserver中为byte[]类型,jdbc type为-2,因此不进行转换。
字段类型 | 快照类型(jdbc type) | cdc类型(jdbc type) |
---|---|---|
DATE | java.sql.Date(91) | java.sql.Date(91) |
TIME | java.sql.Timestamp(92) | java.sql.Time(92) |
DATETIME | java.sql.Timestamp(93) | java.sql.Timestamp(93) |
DATETIME2 | java.sql.Timestamp(93) | java.sql.Timestamp(93) |
DATETIMEOFFSET | microsoft.sql.DateTimeOffset(-155) | microsoft.sql.DateTimeOffset(-155) |
SMALLDATETIME | java.sql.Timestamp(93) | java.sql.Timestamp(93) |
开始写代码喽
java并不是我的擅长( 在学了 ),写代码期间借助了大量人工智能GitHub coplit,帮我实现。 刚开始涉及思路:大统一是全形式,将mysql和sqlserver都写到一个方法中去。
实际中发现并不是很合理。例如mysql的TIMESTAMP类型是时间戳,但sqlserver的TIMESTAMP是byte[]类型,还要在另外判断一下jdbc type是否为-2,也容易产生误解。最终决定分开写。
最后依照官网的模板重写方法就可以了。代码地址点击直达
使用方法
converters
参数为:自定义转换器的名字,可以随意设置。设置的值就作为转换器的名字,在以后的参数中就要使用这个名字。
假设自定义的名字为mydebeziumconverter
,则type
参数为mydebeziumconverter.type
。
mydebeziumconverter.type
参数为:自定义转换器的类名,必须设置。(转换器的方法)
mydebeziumconverter.database.type
参数为:数据库类型,必须设置。(需要设置为mysql或sqlserver)
mydebeziumconverter.format.datetime
参数为:datetime类型的格式,可选。
mydebeziumconverter.format.date
参数为:date类型的格式,可选。
mydebeziumconverter.format.time
参数为:time类型的格式,可选。
如果仅使用mysql或sqlserver建议独立编译代码,只保留mysql或sqlserver的转换器,减少依赖。
flinkcdc
可使用源代码也可使用编译好的jar包。只需要放入目录即可。并在配置中设置参数。
代码语言:javascript复制// 自定义解释器
// 设置解释器及解释器参数
debeziumProperties.put("converters", "mydebeziumconverter");
debeziumProperties.put("mydebeziumconverter.type", "org.util.MyDebeziumConverter");
debeziumProperties.put("mydebeziumconverter.database.type", "mysql");
// 自定义格式,可选
debeziumProperties.put("mydebeziumconverter.format.datetime", "yyyy-MM-dd HH:mm:ss");
debeziumProperties.put("mydebeziumconverter.format.date", "yyyy-MM-dd");
debeziumProperties.put("mydebeziumconverter.format.time", "HH:mm:ss");
debezium
使用jar包,并将其放在 debezium 插件的同一级别目录中。并在配置文件中设置参数。
代码语言:javascript复制"converters": "mydebeziumconverter",
"mydebeziumconverter.type": "org.util.MyDebeziumConverter",
"mydebeziumconverter.database.type": "mysql",
"mydebeziumconverter.format.datetime": "yyyy-MM-dd HH:mm:ss",
"mydebeziumconverter.format.date": "yyyy-MM-dd",
"mydebeziumconverter.format.time": "HH:mm:ss"
github
代码已发布到github,点击直达