一、背景
上一篇文章《应用接入ES(一)-Springboot集成ES》我们讲述了应用集成ES的方式,以及实现各种查询和更新操作,那么问题就来了,既然是查询和更新,肯定要有数据,数据哪里来?怎么来?
本篇文章我们将主要针对业务数据同步到ES展开分析和描述。
二、目标
我们将应用集成ES并不是单纯为了学习技术或者说积累经验,最终的目的是支撑业务,那么我们就需要做以下几件事情:
- 历史数据导入ES
- 增量数据实时同步
- DB和ES数据追平
- ES数据检索以及DB兜底
当然前边三步做的所有工作,都是为了最后一步查询服务,但是如果前边的数据准备工作没处理好,那么就会出现ES数据和DB数据不一致问题,如果仅仅是为了引入ES、为了提高检索能力和性能而牺牲数据的准确性,那么也就失去了接入ES的本质意义。
对于第4点,一般业务上会使用开关主动降级和业务自动降级结合使用,主动降级是在配置平台手动操作开关闭合状态,从而实时控制业务中数据检索的数据来源,业务自动降级是基于主动降级的基础上,对ES检索进行兜底处理,如果ES检索失败或者出现异常,自动降级到DB检索。
业务数据同步到ES,主要通过前边3点来实现,接下来我们将逐步展开分析和讲述。
三、业务数据同步ES方案
抛开数据同步到ES,纵观所有的数据迁移和同步方案,大致分为两个流派,分别是:
- 停机迁移
停机迁移简单粗暴,将源数据停写,然后通过脚本或者其他迁移工具将源数据筛选过滤然后同步到目标数据源中,最大的缺点就是停写,停写的时间取决于源数据源的数据量和迁移方案以及业务的重要程度,在核心并且流量比较大的业务场景中,过度的停机迁移是无法接受的。
- 不停机迁移
不停机迁移又分为严格停机和非严格停机,严格停机迁移是在数据迁移过程中完全不停写,实现方案也相对复杂,难点在于开启双写时间以及数据追平方案;非严格停机迁移是在迁移历史数据阶段(一般数据量比较大的情况下迁移时间也比较久),源数据源不停写,然后历史数据迁移结束后,停写源数据源,通过脚本或者增量日志进行数据最平,当然停机时间相对较短(停机时间取决于历史数据迁移时间内业务增量),对于核心业务数据迁移,在低峰期操作停写追平数据也是可以接受的。
对于非核心业务或者数据增量比较小的业务场景中,在低峰期采用停机迁移方案是比较可取的,但是在数据量比较大并且业务增量也比较大的场景中,衡量和评估影响和操作复杂度两个维度,非严格停机迁移一般是比较可取的。
接下来我们将详细的分析业务数据同步到ES的各种具体实现方案。
1.业务脚本
a.业务脚本迁移历史数据
通过程序批量从源数据源拉去数据,然后利用多线程或者批量同步数据到目标数据源中,并记录开始和结束位点和时间,需要注意的是对于目标数据源是单表的情况下建议使用持久层的批量插入,批量插入的性能远远超过多线程,因为这个过程的性能瓶颈在于目标数据源的iops限制,在iops设置比较小的情况下通过增加业务线程不能从根本上解决同步速度,而通过批量操作将多次同步打包成一次DB交互性能会好很多,但是要注意的是要控制批量操作数据包的大小与网络带宽之间的关系,一般建议批量500或者1000;而在目标数据源是分表的情况下,批量操作往往受限,为了提高性能可以将线程开多一点,一般开20或者30就行,开多了并不能从本质上提升性能,因为最终受限于DB的iops以及应用的cpu内核数。
b.开启增量同步
在服务层收敛目标表的所有写操作,开启增量同步,也就是开启双写,可以在历史数据开始迁移时开启双写,需要数据的是,新数据源更新操作可能会出现数据不存在,可直接跳过。
c.追平数据
记录历史数据迁移的开始和结束位点,然后捞取此期间的所有写操作日志,分析发生过更新操作的业务id,然后通过业务脚本进行追平,但是在极端情况下也可能出现数据追平的过程中由于源数据源未停写,导致需要追平的数据再次发生变更,但是概率和数量一般比较可控,可再次针对性做数据追平即可。在数据追平阶段可以采用停机方案,可迅速高效的追平数据。
2.canal 业务脚本
a.业务脚本迁移历史数据
同样使用1中的历史数据迁移方式。
b.增量数据同步
canal是阿里开源的增量数据同步工具,其核心是将canal伪装成一个slave库,解析master的binlog日志并发送数据变更事件给consumer,我们在业务层接收canal的数据变更事件然后同步数据到目标数据源,大致模式如下:
原理特别简单,是canal server监听源数据源的数据变更,解析binlog然后发送数据变更事件给我们的业务程序(canal client),然后业务程序解析数据变更事件同步数据到ES。该方式相比较1方式相对比较优雅,将数据同步从业务程序中解耦出来,不侵入业务。
c.追平数据
追平数据可采用1中的方式。
3.canal kafka 业务脚本
a.历史数据迁移
同1。
b.增量数据同步
该方案与2类似,但是实现方式相对更优雅一点,首先基于消息的天然异步属性,将原来的同步操作变成异步(延迟可接受),然后kafka有一定的数据存储能力,对于consumer崩溃后恢复或者重启后,提供了数据回放能力(可重新消费消息同步数据),整体实现方式如下图:
原理是canal server监听源数据源的数据变更,解析binlog然后发送数据变更事件给kafka broker,然后我们的业务程序充当kafka consumer角色,接收数据变更消息,然后将数据同步到目标数据库。
c.追平数据
同1。
4.canal canalAdapter 业务脚本
a.历史数据迁移
同1。
b.增量数据同步
canal 1.1.1版本之后, 内置增加客户端数据同步功能,canal adapter 的 Elastic Search 版本支持6.x.x以上, 如需其它版本的es可替换依赖重新编译client-adapter.elasticsearch模块,说白了其实是canal客户端内置了同步数据到ES的功能,替我们把接收canal server发送的数据变更事件解析并同步到ES的工作给做了,其原理如下图:
原理也比较简单,是canal server监听源数据源的数据变更,解析binlog然后发送数据变更事件给我们的服务层canalAdapter(canal client),然后canalAdapter解析数据变更事件同步数据到ES。该方式算是对开发来说工作量最小的实现方式,只需要服务层集成canalAdapter做一些数据源配置以及字段筛选操作。
c.追平数据
同1。
综合以上几种实现方式,从纸面上看4是最完美的解决方案,但是套用高中物理老师说的一句话“杀猪杀屁股,各有各的杀法”,每一种实现方式都有其存在的意义,每个人对实现方式的理解也会有所不同,最重要的是每个业务场景的诉求也不尽相同,所有就像jvm垃圾回收一样,永远没有一种一统江湖的垃圾回收器,只不过是永远在追求完美的解决方案,在特定的场景选择特定的处理方式就是完美的解决方案,所以虽然4看起来比较优雅并且开发工作量小以及不侵入业务,但是就我个人而言,我更偏向于选择3的实现方式,除了提供业务解耦、异步操作之外,还能支持和容忍client偶发的宕机和重启。