Nebula Importer 数据导入实践

2022-07-04 16:34:13 浏览数 (2)

本文首发于 Nebula Graph Community 公众号

Nebula Importer 数据导入实践Nebula Importer 数据导入实践

前言

Nebula 目前作为较为成熟的产品,已经有着很丰富的生态。数据导入的维度而言就已经提供了多种选择。有大而全的Nebula Exchange,小而精简的Nebula Importer, 还有为 Spark / Flink 引擎提供的Nebula Spark Connector 和 Nebula Flink Connector。

在众多的导入方式中,究竟哪种较为方便呢?

使用场景介绍:

  • Nebula Exchange
    • 需要将来自 Kafka、Pulsar 平台的流式数据, 导入 Nebula Graph 数据库
    • 需要从关系型数据库(如 MySQL)或者分布式文件系统(如 HDFS)中读取批式数据
    • 需要将大批量数据生成 Nebula Graph 能识别的 SST 文件
  • Nebula Importer
    • Importer 适用于将本地 CSV 文件的内容导入至 Nebula Graph 中
  • Nebula Spark Connector:
    • 在不同的 Nebula Graph 集群之间迁移数据
    • 在同一个 Nebula Graph 集群内不同图空间之间迁移数据
    • Nebula Graph 与其他数据源之间迁移数据
    • 结合 Nebula Algorithm 进行图计算
  • Nebula Flink Connector
    • 在不同的 Nebula Graph 集群之间迁移数据
    • 在同一个 Nebula Graph 集群内不同图空间之间迁移数据
    • Nebula Graph 与其他数据源之间迁移数据

以上摘自 Nebula 官方文档:https://docs.nebula-graph.com.cn/2.6.2/1.introduction/1.what-is-nebula-graph/

总体来说,Exchange 大而全,可以和大部分的存储引擎结合,导入到 Nebula 中,但是需要部署Spark 环境。

Nebula Importer 数据导入实践Nebula Importer 数据导入实践

Importer 使用简单,所需依赖较少,但需要自己提前生成数据文件,配置好 schema 一劳永逸,但是不支持断点续传,适合数据量中等。

Spark / Flink Connector 需要和流数据结合。

不同的场景选择不同的工具,如果作为新人使用 Nebula 在导入数据时,建议使用 Nebula Importer 工具,简单快速上手

Nebula Importer 的使用

在我们接触 Nebula Graph 初期,当时生态不够完善, 加上只有部分业务迁移到 Nebula Graph 上,我们对 Nebula Graph 数据的导入不管全量还是增量都是采用 Hive 表推到 Kafka,消费 Kafka 批量写入 Nebula Graph 的方式。后来随着越来越多的数据和业务切换到 Nebula Graph,导入的数据效率问题愈发严峻,导入时长的增加,使得业务高峰期时仍然在全量的数据导入,这是不可接受的。

针对以上问题,在尝试 Nebula Spark Connector 和 Nebula Importer 之后,由便于维护和迁移多方面考虑,采用 Hive table -> CSV -> Nebula Server -> Nebula Importer 的方式进行全量的导入,整体耗时时长也有较大的提升。

Nebula Importor 的相关配置

系统环境
代码语言:shell复制
[root@nebula-server-prod-05 importer]# lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    2
Core(s) per socket:    8
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 85
Model name:            Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping:              7
CPU MHz:               2499.998
BogoMIPS:              4999.99
Hypervisor vendor:     KVM
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              1024K
L3 cache:              36608K
NUMA node0 CPU(s):     0-15

Disk:SSD
Memory: 128G
集群环境
  • Nebula Version:v2.6.1
  • 部署方式:RPM
  • 集群规模:三副本,六节点
数据规模
代码语言:shell复制
 --------- -------------------------- ----------- 
| "Space" | "vertices"               | 559191827 |
 --------- -------------------------- ----------- 
| "Space" | "edges"                  | 722490436 |
 --------- -------------------------- ----------- 
Importer 配置
代码语言:shell复制
# Graph版本,连接2.x时设置为v2。
version: v2

description: Relation Space import data

# 是否删除临时生成的日志和错误数据文件。
removeTempFiles: false

clientSettings:

  # nGQL语句执行失败的重试次数。
  retry: 3

  # Nebula Graph客户端并发数。
  concurrency: 5

  # 每个Nebula Graph客户端的缓存队列大小。
  channelBufferSize: 1024

  # 指定数据要导入的Nebula Graph图空间。
  space: Relation

  # 连接信息。
  connection:
    user: root
    password: ******
    address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669

  postStart:
    # 配置连接Nebula Graph服务器之后,在插入数据之前执行的一些操作。
    commands: |

    # 执行上述命令后到执行插入数据命令之间的间隔。
    afterPeriod: 1s

  preStop:
    # 配置断开Nebula Graph服务器连接之前执行的一些操作。
    commands: |

# 错误等日志信息输出的文件路径。    
logPath: /mnt/csv_file/prod_relation/err/test.log
....

由于篇幅 只展示些全局相关的配置,点边相关的配置较多,不再展开,详情可以参考GitHub。

设置 Crontab,Hive 生成表之后传输到 Nebula Server,在夜间流量较低的时候跑起 Nebula Importer 任务:

代码语言:shell复制
50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log

总共耗时 2h,在六点左右完成全量数据的导入。

部分 log 如下,导入速度最高维持在 200,000/s 左右:

代码语言:shell复制
2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)

然后在七点,根据时间戳,重新消费 Kafka 导入当天凌晨到七点的增量数据, 防止 T 1 的全量数据覆盖当天的增量数据。

代码语言:shell复制
50 07 15 * * python3  /mnt/code/consumer_by_time/relation_consumer_by_timestamp.py

增量的消费大概耗时 10-15min。

实时性

根据 MD5 对比之后得到的增量数据,导入Kafka中,实时消费 Kafka 的数据,确保数据的延迟不超过 1 分钟。

另外长时间的实时可能会有非预期的数据问题出现而未发现,所以每 30 天会导入一次全量数据,上面介绍的 Importer 导入。然后给 Space 的点边添加 TTL=35 天确保未及时更新的数据会被 Filter 和后续回收。

一些注意点

论坛帖子 https://discuss.nebula-graph.com.cn/t/topic/361 这里提到了关于 CSV 导入常遇到的问题,大家可以参考下。另外根据经验这边有几点建议:

  1. 关于并发度,问题中有提到,这个 concurrency 指定为你的 cpu cores 就可以, 表示起多少个 client 连接 Nebula Server。 实际操作中,要去 trade off 导入速度和服务器压力的影响。在我们这边测试,如果并发过高,会导致磁盘 IO 过高,引发设置的一些告警,不建议一下把并发拉太高,可以根据实际业务测试下做权衡。
  2. Importer 并不能断点续传,如果出现错误,需要手动处理。在实际操作中,我们会程序分析 Importer 的 log,根据情况处理,如果哪部分数据出现了非预期的错误,会告警通知,人工介入,防止出现意外。
  3. Hive 生成表之后传输到 Nebula Server, 这部分任务 实际耗时是和 Hadoop 资源情况密切相关的,有可能会出现资源不够导致 Hive 和 CSV 表生成时间滞缓,而 Importer 正常在跑的情况,这部分需要提前做好预判。我们这边是根据hive任务结束时间和 Importer 任务开始时间做对比,判断是否需要 Importer 的进程正常运行。

0 人点赞