数仓实战|实时同步Kafka数据到Doris

2021-08-27 16:29:31 浏览数 (1)

大家好,我是一哥,Doris成为MPP数据库新贵。Doris起源于百度,致力于满足企业用户的多种数据分析场景,支持多种数据模型(明细表, 聚合表), 多种导入方式(批量), 可整合和接入多种现有系统(Spark, Flink, Hive, ElasticSearch)。

今天分享一下Doris的例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能,当前仅支持从 Kafka 系统进行例行导入。

01

基本原理

Routine Load 的基本原理:

  1. Client 向 FE 提交一个例行导入作业
  2. FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行
  3. 在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报
  4. FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入

02

使用场景

Routine Load 在数据仓库中主要有两种应用场景:

  1. 接口数据导入。由于批处理抽取数据存在大量重复抽取的情况,越来越多的交易系统采用binlog或者直接提供接口更新数据到Kafka的方式来完成接口数据的对接。针对binlog日志或者Kafka消息队列,批处理程序是无法抽取的,所以需要采用流式数据写入。
  2. 实时数仓结果数据导入。根据Lambda架构,实时数据通过Kafka对接以后,继续经由Flink加工,加工完的数据继续写回Kafka,然后由Routine Load加载到Doris数据库,即可直接供数据分析应用读取数据。这种应用场景还不太成熟。

03

应用案例

实时接入kafka数据目前是有一些使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。
  3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

下面讲解两个例行导入任务案例:

例行导入的操作其实很简单,看再多的说明,不如一个案例来得实际。接下来直接给出两个导入案例。

案例一:数据源源不断的写入,不作更新或者删除操作

代码语言:javascript复制
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_cdc_st_entry_detail_et ON ods_drp_cdc_st_entry_detail_et 
COLUMNS(ACCOUNT_LINE_ID, update_time,cdc_op,cdc_time=now()) 
    PROPERTIES
    (
    "desired_concurrent_number"="1",
    "max_batch_interval" = "20",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "format" = "json",
    "json_root" = "$.data",
    "jsonpaths" = "["$.ACCOUNT_LINE_ID","$.update_time","$.type"]"
     )
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
    "kafka_topic" = "drds_hana_ods_st_entry_detail_et",
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "ods_drp_st_entry_detail_et",
    "property.client.id" = "doris"
);

案例二:接口数据存在删除和更新操作

首先需要设置目标表为支持批量删除的模式:

代码语言:javascript复制
ALTER TABLE ods_drp.ods_drp_vip_weixin ENABLE FEATURE "BATCH_DELETE";

然后创建导入任务:

代码语言:javascript复制
CREATE ROUTINE LOAD ods_drp.rtl_ods_drp_vip_weixin ON ods_drp_vip_weixin
WITH MERGE
COLUMNS(rec_id, vip_user_id, vip_id, vip_code, tel, vip_source, openid, unionid, appid, brand_code, create_user_name, create_user, create_time, modify_user_name, modify_user, modify_time, version, update_time,CDC_OP),
DELETE ON CDC_OP="DELETE"
    PROPERTIES
    (
    "desired_concurrent_number"="1",
    "max_batch_interval" = "20",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "format" = "json",
    "json_root" = "$.data",
    "jsonpaths" = "["$.rec_id","$.vip_user_id","$.vip_id","$.vip_code","$.tel","$.vip_source","$.openid","$.unionid","$.appid","$.brand_code","$.create_user_name","$.create_user","$.create_time","$.modify_user_name","$.modify_user","$.modify_time","$.version","$.update_time","$.type"]"
     )
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",
    "kafka_topic" = "drds_hana_ods_vip_weixin",
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "ods_drp_vip_weixin",
    "property.client.id" = "doris"
);

其中,前面的PROPERTIES括号里面存放的是加载的配置信息,KAFKA后面的括号里面存放的是KAFKA的配置信息。

例行导入常用操作

创建完 Routine Load 任务以后, Routine Load 会在后台持续运行。为了监控和检查 Routine Load 任务状态,我们需要进入对应的数据库schema下执行命令查看任务。

代码语言:javascript复制
show routine load; --用于显示所有的例行导入任务状态
pause routine load for xxx;  --暂停xxx导入任务
resume routine load for xxx;  --重启xxx导入任务
stop routine load for xxx;  --停止xxx导入任务,停止以后任务会从队列中消失
ALTER ROUTINE LOAD FOR XX
PROPERTIES
(
    "desired_concurrent_number" = "1"
)
FROM kafka
(
    "kafka_partitions" = "0",
    "kafka_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "xxx_topic",
    "property.client.id" = "doris"
);    --修改任务参数,从kafka队列最开始重新读取

end

0 人点赞