通过StreamSets实时更新数据至ElasticSearch

2022-03-24 08:13:41 浏览数 (1)

网上许多关于StreamSets增量更新的教程几乎都是单单INSERT操作,这使得目标数据库会出现重复数据,而实际需求上我们往往更多是需要INSERTUPDATE操作,利用SQL ServerTIMESTAMP(时间戳)特性,可以很容易实现这一点。

源数据库配置

  需要明白一点,在SQL Server中的TIMESTAMP和时间无关,每次对INSERTUPDATE操作,对于TIMESTAMP列所在的行中的值均会更新。   将时间戳字段LastTimestamp作为偏移量填入Offset Column处,偏移量初始值Initial Offset设为0。

时间戳处理

  由于ElaticSearch没有TIMESTAMP或相似的类型,故作了转换处理,即上图的BIGINT类型,而直接将转换后的数据映射到目标数据库却会报错,我暂时不知道怎么解决,就通过Field Remover做个移除。

目标数据库配置

  注意Default Operation需要选择UPDATE with doc_as_upsert

0 人点赞