StarRocks学习-进阶

2022-11-13 13:13:44 浏览数 (1)

目录

一、数据导入

名词解释

基本原理 

导入方式

1.Broker Load

2.Spark Load

3.Stream Load

4.Routine Load

5.Insert Into

同步和异步

同步导入

异步导入

通用系统配置

FE 配置

BE 配置

注意事项

二、数据导出

名词解释

原理

导出作业的执行流程

查询计划拆分

查询计划执行

使用示例

获取导出作业 query id

查看导出作业状态

最佳实践

注意事项

相关配置


一、数据导入

数据导入功能是将原始数据按照相应的模型进行清洗转换并加载到StarRocks中,方便查询使用。

根据不同的数据来源可以选择不同的导入方式:

  • 离线数据导入,如果数据源是Hive/HDFS(数据量为几十GB到上百GB),推荐采用Broker Load导入, 如果数据表很多导入比较麻烦可以考虑使用Hive外表直连查询,性能会比Broker load导入效果差,但是可以避免数据搬迁,如果单表的数据量特别大(数据量达到TB级别),或者需要做全局数据字典来精确去重可以考虑Spark Load导入。
  • 实时数据导入,日志数据和业务数据库的binlog同步到Kafka以后,优先推荐通过Routine load 导入StarRocks,如果导入过程中有复杂的多表关联和ETL预处理可以使用Flink处理以后用stream load写入StarRocks,我们有标准的Flink-connector可以方便Flink任务使用。
  • 程序写入StarRocks,推荐使用Stream Load,可以参考例子中有Java/Python的demo。
  • 文本文件导入推荐使用 Stream load(数据存储在本地文件中,数据量小于10GB)
  • Mysql数据导入,推荐使用Mysql外表,insert into new_table select * from external_table 的方式导入
  • 其他数据源导入,推荐使用DataX导入,我们提供了DataX-starrocks-writer
  • StarRocks内部导入,可以在StarRocks内部使用insert into tablename select的方式导入,可以跟外部调度器配合实现简单的ETL处理。

名词解释

  • 导入作业:导入作业读取用户提交的源数据并进行清洗转换后,将数据导入到StarRocks系统中。导入完成后,数据即可被用户查询到。
  • Label:所有导入作业都有一个Label,用于标识一个导入作业。Label可由用户指定或系统自动生成。Label在一个数据库内是唯一的,一个Label仅可用于一个成功的导入作业。当一个Label对应的导入作业成功后,不可再重复使用该Label提交导入作业。如果某Label对应的导入作业失败,则该Label可以被再使用。该机制可以保证Label对应的数据最多被导入一次,即At-Most-Once语义。
  • 原子性:StarRocks中所有导入方式都提供原子性保证,即同一个导入作业内的所有有效数据要么全部生效,要么全部不生效,不会出现仅导入部分数据的情况。这里的有效数据不包括由于类型转换错误等数据质量问题而被过滤的数据。具体见常见问题小节里所列出的数据质量问题。
  • MySQL协议/HTTP协议:StarRocks提供两种访问协议接口:MySQL协议和HTTP协议。部分导入方式使用MySQL协议接口提交作业,部分导入方式使用HTTP协议接口提交作业。
  • Broker Load:Broker导入,即通过部署的Broker程序读取外部数据源(如HDFS)中的数据,并导入到StarRocks。Broker进程利用自身的计算资源对数据进行预处理导入。
  • Spark Load:Spark导入,即通过外部资源如Spark对数据进行预处理生成中间文件,StarRocks读取中间文件导入。这是一种异步的导入方式,用户需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。
  • FE:Frontend,StarRocks系统的元数据和调度节点。在导入流程中主要负责导入执行计划的生成和导入任务的调度工作。
  • BE:Backend,StarRocks系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。
  • Tablet:StarRocks表的逻辑分片,一个表按照分区、分桶规则可以划分为多个分片(参考数据分布章节)。

基本原理 

一个导入作业主要分为5个阶段:

1.PENDING

非必须。该阶段是指用户提交导入作业后,等待FE调度执行。

Broker Load和Spark Load包括该步骤。

2.ETL

非必须。该阶段执行数据的预处理,包括清洗、分区、排序、聚合等。

Spark Load包括该步骤,它使用外部计算资源Spark完成ETL。

3.LOADING

该阶段先对数据进行清洗和转换,然后将数据发送给BE处理。当数据全部导入后,进入等待生效过程,此时导入作业状态依旧是LOADING。

4.FINISHED

在导入作业涉及的所有数据均生效后,作业的状态变成 FINISHED,FINISHED后导入的数据均可查询。FINISHED是导入作业的最终状态。

5.CANCELLED

在导入作业状态变为FINISHED之前,作业随时可能被取消并进入CANCELLED状态,如用户手动取消或导入出现错误等。CANCELLED也是导入作业的一种最终状态。

数据导入格式:

  • 整型类(TINYINT,SMALLINT,INT,BIGINT,LARGEINT):1, 1000, 1234
  • 浮点类(FLOAT,DOUBLE,DECIMAL):1.1, 0.23, .356
  • 日期类(DATE,DATETIME):2017-10-03, 2017-06-13 12:34:03
  • 字符串类(CHAR,VARCHAR):I am a student, a
  • NULL值:N

导入方式

1.Broker Load

Broker Load 通过 Broker 进程访问并读取外部数据源,然后采用 MySQL 协议向 StarRocks 创建导入作业。提交的作业将异步执行,用户可通过 SHOW LOAD 命令查看导入结果。

Broker Load适用于源数据在Broker进程可访问的存储系统(如HDFS)中,数据量为几十GB到上百GB。

2.Spark Load

Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 StarRocks 大数据量的导入性能并且节省 StarRocks 集群的计算资源。Spark load 是一种异步导入方式,需要通过 MySQL 协议创建导入作业,并通过 SHOW LOAD 查看导入结果。

Spark Load适用于初次迁移大数据量(可到TB级别)到StarRocks的场景,且源数据在Spark可访问的存储系统(如HDFS)中。

3.Stream Load

Stream Load是一种同步执行的导入方式。用户通过 HTTP 协议发送请求将本地文件或数据流导入到 StarRocks中,并等待系统返回导入的结果状态,从而判断导入是否成功。

Stream Load适用于导入本地文件,或通过程序导入数据流中的数据。

4.Routine Load

Routine Load(例行导入)提供了一种自动从指定数据源进行数据导入的功能。用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如 Kafka)中读取数据并导入到 StarRocks 中。

5.Insert Into

类似 MySQL 中的 Insert 语句,StarRocks 提供 INSERT INTO tbl SELECT ...; 的方式从 StarRocks 的表中读取数据并导入到另一张表。或者通过 INSERT INTO tbl VALUES(...); 插入单条数据。

同步和异步

StarRocks目前的导入方式分为两种:同步和异步。

同步导入

同步导入方式即用户创建导入任务,StarRocks 同步执行,执行完成后返回导入结果。用户可通过该结果判断导入是否成功。

同步类型的导入方式有:Stream Load,Insert。

操作步骤:

  • 用户(外部系统)创建导入任务。
  • StarRocks返回导入结果。
  • 用户(外部系统)判断导入结果。如果导入结果为失败,可以再次创建导入任务。

异步导入

异步导入方式即用户创建导入任务后,StarRocks直接返回创建成功。创建成功不代表数据已经导入成功。导入任务会被异步执行,用户在创建成功后,需要通过轮询的方式发送查看命令查看导入作业的状态。如果创建失败,则可以根据失败信息,判断是否需要再次创建。

异步类型的导入方式有:Broker Load, Spark Load。

操作步骤

  • 用户(外部系统)创建导入任务;
  • StarRocks返回创建任务的结果;
  • 用户(外部系统)判断创建任务的结果,如果成功则进入步骤4;如果失败则可以回到步骤1,重新尝试创建导入任务;
  • 用户(外部系统)轮询查看任务状态,直到状态变为FINISHED或CANCELLED。

通用系统配置

FE 配置

以下配置属于FE的系统配置,可以通过修改FE的配置文件fe.conf来修改:

  • max_load_timeout_second和min_load_timeout_second 设置导入超时时间的最大、最小取值范围,均以秒为单位。默认的最大超时时间为3天,最小超时时间为1秒。用户自定义的导入超时时间不可超过这个范围。该参数通用于所有类型的导入任务。
  • desired_max_waiting_jobs 等待队列可以容纳的最多导入任务数目,默认值为100。如FE中处于PENDING状态(即等待执行)的导入任务数目达到该值,则新的导入请求会被拒绝。此配置仅对异步执行的导入有效,如处于等待状态的异步导入任务数达到限额,则后续创建导入的请求会被拒绝。
  • max_running_txn_num_per_db 每个数据库中正在运行的导入任务的最大个数(不区分导入类型、统一计数),默认值为100。当数据库中正在运行的导入任务超过最大值时,后续的导入不会被执行。如果是同步作业,则作业会被拒绝;如果是异步作业,则作业会在队列中等待。
  • label_keep_max_second 导入任务记录的保留时间。已经完成的( FINISHED or CANCELLED )导入任务记录会在StarRocks系统中保留一段时间,时间长短则由此参数决定。参数默认值时间为3天。该参数通用于所有类型的导入任务。

BE 配置

以下配置属于BE的系统配置,可以通过修改BE的配置文件be.conf来修改:

  • push_write_mbytes_per_sec BE上单个Tablet的写入速度限制。默认是10,即10MB/s。根据Schema以及系统的不同,通常BE对单个Tablet的最大写入速度大约在10-30MB/s之间。可以适当调整这个参数来控制导入速度。
  • write_buffer_size 导入数据在 BE 上会先写入到一个内存块,当这个内存块达到阈值后才会写回磁盘。默认大小是 100MB。过小的阈值可能导致 BE 上存在大量的小文件。可以适当提高这个阈值减少文件数量。但过大的阈值可能导致RPC超时,见下面的配置说明。
  • tablet_writer_rpc_timeout_sec 导入过程中,发送一个 Batch(1024行)的RPC超时时间。默认为600秒。因为该RPC可能涉及多个分片内存块的写盘操作,所以可能会因为写盘导致RPC超时,可以适当调整这个超时时间来减少超时错误(如 send batch fail 错误)。同时,如果调大write_buffer_size配置,也需要适当调大这个参数。
  • streaming_load_rpc_max_alive_time_sec 在导入过程中,StarRocks会为每个Tablet开启一个Writer,用于接收数据并写入。这个参数指定了Writer的等待超时时间。默认为600秒。如果在参数指定时间内Writer没有收到任何数据,则Writer会被自动销毁。当系统处理速度较慢时,Writer可能长时间接收不到下一批数据,导致导入报错:TabletWriter add batch with unknown id。此时可适当增大这个配置。
  • load_process_max_memory_limit_bytes和load_process_max_memory_limit_percent 这两个参数分别是最大内存和最大内存百分比,限制了单个BE上可用于导入任务的内存上限。系统会在两个参数中取较小者,作为最终的BE导入任务内存使用上限。
    • load_process_max_memory_limit_percent:表示对BE总内存限制的百分比。默认为30。(总内存限制 mem_limit 默认为 80%,表示对物理内存的百分比)。即假设物理内存为 M,则默认导入内存限制为 M * 80% * 30%。
    • load_process_max_memory_limit_bytes:默认为100GB。

注意事项

用户在向StarRocks导入数据时,一般会采用程序对接的方式。以下是导入数据时的一些注意事项:

  1. 选择合适的导入方式:根据数据量大小、导入频次、数据源所在位置选择导入方式。例如:如果原始数据存放在HDFS上,则使用Broker load导入。
  2. 确定导入方式的协议:如果选择了Broker Load导入方式,则外部系统需要能使用MySQL协议定期提交和查看导入作业。
  3. 确定导入方式的类型:导入方式分为同步或异步。如果是异步导入方式,外部系统在提交创建导入后,必须调用查看导入命令,根据查看导入命令的结果来判断导入是否成功。
  4. 制定Label生成策略:Label生成策略需满足对每一批次数据唯一且固定的原则。
  5. 保证Exactly-Once:外部系统需要保证数据导入的At-Least-Once,StarRocks的Label机制可以保证数据导入的At-Most-Once。这样整体上就可以保证数据导入的Exactly-Once。

二、数据导出

StarRocks 拥有 Export 一种将数据导出并存储到其他介质上的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS/阿里云OSS/AWS S3(或者兼容S3协议的对象存储) 等。

名词解释

  • FE:Frontend,StarRocks的前端节点。负责元数据管理和请求接入。
  • BE:Backend,StarRocks的后端节点。负责查询执行和数据存储。
  • Broker:StarRocks 可以通过 Broker 进程对远端存储进行文件操作。
  • Tablet:数据分片。一个表会分成 1 个或多个分区,每个分区会划分成多个数据分片。

原理

导出作业的执行流程

用户提交一个导出作业后,StarRocks 会统计这个作业涉及的所有 Tablet,然后对这些 Tablet 进行分组每组生成一个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中。

上图描述的处理流程主要包括:

  1. 用户提交一个 Export 作业到 FE。
  2. PENDING阶段:FE 生成一个 ExportPendingTask,向 BE 发送 snapshot 命令,对所有涉及到的 Tablet 做一个快照,并生成多个查询计划。
  3. EXPORTING阶段:FE 生成一个 ExportExportingTask,BE和Broker会根据FE生成的查询计划配合完成数据导出工作。

查询计划拆分

Export 作业会生成多个查询计划,每个查询计划负责扫描一部分 Tablet。每个查询计划中每个 BE 扫描的数据量由 FE 配置参数 export_max_bytes_per_be_per_task 计算得到,默认为 256M。每个查询计划中每个 BE 最少一个 Tablet,最多导出的数据量不超过配置的参数 export_max_bytes_per_be_per_task

一个作业的多个查询计划并行执行,任务线程池的大小通过 FE 参数 export_task_pool_size 配置,默认为 5。

查询计划执行

一个查询计划扫描多个分片,将读取的数据以行的形式组织,每 1024 行为 一个 batch,调用 Broker 写入到远端存储上。

查询计划遇到错误会整体自动重试 3 次。如果一个查询计划重试 3 次依然失败,则整个作业失败。

StarRocks 会首先在指定的远端存储的路径中,建立一个名为 __starrocks_export_tmp_921d8f80-7c9d-11eb-9342-acde48001122 的临时目录(其中 921d8f80-7c9d-11eb-9342-acde48001122 为作业的 query id)。导出的数据首先会写入这个临时目录。导入成功后每个查询计划会生成一个文件,文件名示例:

lineorder_921d8f80-7c9d-11eb-9342-acde48001122_1_2_0.csv.1615471540361

其中:

  • 1615471540361 为时间戳,用于保证重试生成的文件不冲突。

当所有数据都导出后,StarRocks 会将这些文件 rename 到用户指定的路径中,rename的时候会去掉后面的时间戳。最终导出的文件名示例:

lineorder_921d8f80-7c9d-11eb-9342-acde48001122_1_2_0.csv

其中:

  • lineorder_:为导出文件的前缀,由用户指定到导出路径中,不指定默认为data_
  • 921d8f80-7c9d-11eb-9342-acde48001122:为作业的 query id。文件名默认包含 query id,指定参数 include_query_id = false 后不包含。
  • 1_2_0:分为三部分,第一部分为查询计划对应任务的序号,第二部分为任务中实例的序号,第三部分为一个实例中生成文件的序号。
  • csv:为导出文件格式,目前只支持 csv 格式。

使用示例

提交导出作业

导出作业举例如下:

代码语言:javascript复制
EXPORT TABLE db1.tbl1 
PARTITION (p1,p2)
(col1, col3)
TO "hdfs://host/path/to/export/lineorder_" 
PROPERTIES
(
    "column_separator"=",",
    "load_mem_limit"="2147483648",
    "timeout" = "3600"
)
WITH BROKER "hdfs"
(
    "username" = "user",
    "password" = "passwd"
);

可以指定需要导出的分区,不写默认导出表中所有分区。

可以指定需要导出的列,顺序可以跟 schema 不同,不写默认导出表中所有列。

导出路径如果指定到目录,需要指定最后的/,否则最后的部分会被当做导出文件的前缀。不指定前缀默认为data_。 示例中导出文件会生成到 export 目录中,文件前缀为 lineorder_

PROPERTIES如下:

  • column_separator:列分隔符。默认为 t
  • line_delimiter:行分隔符。默认为 n
  • load_mem_limit: 表示 Export 作业中,一个查询计划单个 BE 上的内存使用限制。默认 2GB。单位字节。
  • timeout:作业超时时间。默认为 2 小时。单位秒。
  • include_query_id: 导出文件名中是否包含 query id,默认为 true。

获取导出作业 query id

提交作业后,可以通过 SELECT LAST_QUERY_ID() 命令获得导出作业的 query id。用户可以通过 query id 查看或者取消作业。

查看导出作业状态

提交作业后,可以通过 SHOW EXPORT 命令查询导出作业状态。

代码语言:javascript复制
SHOW EXPORT WHERE queryid = "edee47f0-abe1-11ec-b9d1-00163e1e238f";

结果举例如下:

代码语言:javascript复制
     JobId: 14008
     State: FINISHED
  Progress: 100%
  TaskInfo: {"partitions":["*"],"mem limit":2147483648,"column separator":",","line delimiter":"n","tablet num":1,"broker":"hdfs","coord num":1,"db":"default_cluster:db1","tbl":"tbl3",columns:["col1", "col3"]}
      Path: oss://bj-test/export/
CreateTime: 2019-06-25 17:08:24
 StartTime: 2019-06-25 17:08:28
FinishTime: 2019-06-25 17:08:34
   Timeout: 3600
  ErrorMsg: N/A
  • JobId:作业的Job ID
  • QueryId:作业的查询ID
  • State:作业状态:
    • PENDING:作业待调度
    • EXPORING:数据导出中
    • FINISHED:作业成功
    • CANCELLED:作业失败
  • Progress:作业进度。该进度以查询计划为单位。假设一共 10 个查询计划,当前已完成 3 个,则进度为 30%。
  • TaskInfo:以 Json 格式展示的作业信息:
    • db:数据库名
    • tbl:表名
    • partitions:指定导出的分区。*表示所有分区。
    • mem limit:查询的内存使用限制。单位字节。
    • column separator:导出文件的列分隔符。
    • line delimiter:导出文件的行分隔符。
    • tablet num:涉及的总 Tablet 数量。
    • broker:使用的 broker 的名称。
    • coord num:查询计划的个数。
    • columns:导出的列。
  • Path:远端存储上的导出路径。
  • CreateTime/StartTime/FinishTime:作业的创建时间、开始调度时间和结束时间。
  • Timeout:作业超时时间。单位是「秒」。该时间从 CreateTime 开始计算。
  • ErrorMsg:如果作业出现错误,这里会显示错误原因。

取消作业

举例如下:

代码语言:javascript复制
CANCEL EXPORT WHERE queryid = "921d8f80-7c9d-11eb-9342-acde48001122";

最佳实践

查询计划的拆分

一个 Export 作业有多少查询计划需要执行,取决于总共有多少 Tablet,以及一个查询计划可以处理的最大数据量。 作业是按照查询计划来重试的,如果一个查询计划处理更多的数据量,查询计划出错(比如调用 Broker 的 RPC 失败,远端存储出现抖动等),会导致一个查询计划的重试成本变高。每个查询计划中每个 BE 扫描的数据量由 FE 配置参数 export_max_bytes_per_be_per_task 计算得到,默认为 256M。每个查询计划中每个 BE 最少一个 Tablet,最多导出的数据量不超过配置的参数 export_max_bytes_per_be_per_task

一个作业的多个查询计划并行执行,任务线程池的大小通过 FE 参数 export_task_pool_size 配置,默认为 5。

load_mem_limit

通常一个导出作业的查询计划只有「扫描-导出」两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 的数据版本过多时,可能会导致内存不足。此时需要修改这个参数设置更大的内存,比如 4GB、8GB 等。

注意事项

  • 不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。
  • 如果表数据量过大,建议按照分区导出。
  • 在 Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。
  • Export 作业产生的__starrocks_export_tmp_xxx临时目录,作业失败或成功后会自动删除。
  • 当 Export 运行完成后(成功或失败),FE 发生重启或切主,则SHOW EXPORT展示的作业的部分信息会丢失,无法查看。
  • Export 作业只会导出 Base 表的数据,不会导出 Rollup Index 的数据。
  • Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。

相关配置

主要介绍 FE 中的相关配置。

  • export_checker_interval_second:Export 作业调度器的调度间隔,默认为 5 秒。设置该参数需重启 FE。
  • export_running_job_num_limit:正在运行的 Export 作业数量限制。如果超过,则作业将等待并处于 PENDING 状态。默认为 5,可以运行时调整。
  • export_task_default_timeout_second:Export 作业默认超时时间。默认为 2 小时。可以运行时调整。
  • export_max_bytes_per_be_per_task: 每个导出任务在每个 BE 上最多导出的数据量,用于拆分导出作业并行处理。按压缩后数据量计算,默认为 256M。
  • export_task_pool_size: 导出任务线程池的大小。默认为 5。

官方链接:StarRocks @ StarRocks_intro @ StarRocks Docs

0 人点赞