Doris使用Stream Load 导入数据

2024-02-06 16:55:04 浏览数 (1)

Doris 结点分为FE 和 BE结点, BE负责存储,FE负责任务查询调度。

FE和BE 都能使用 Stream Load 导入接口,区别是:

当使用Stream Load 导入数据时,FE会查找分配结点,使用HTTP 307 重定向流量。

FE http默认端口 8030

BE http默认端口 8030

导入方式:

1. 使用curl ,使用curl会自动重定向到BE结点地址
代码语言:javascript复制
 curl --location-trusted -u root  -H "label:12345" -H "format: json" -H "Expect:100-continue" -H "read_json_by_line:true" -T test.json http://ip:8030/api/demo/t_history_data_2/_stream_load

2. 使用 Java中导入数据时:

特别注意从FE结点接口导入数据,需要二次解析重定向地址,即取到重定向的BE接口地址。

要取返回结果中Head 中Location的地址:

代码语言:javascript复制
Response Headers:  null=[HTTP/1.1 307 Temporary RedirectVary=[Access-Control-Request-Headers, Access-Control-Request-Method, Origin] Content-Length=[0]
    Content-Language=[zh-CN]
    Date=[Tue, 06 Feb 2024 08:41:22 GMT]
    Location=[http://root@default_cluster:@ip:8040/api/demo/t_history_data_2/_stream_load/?]
Response Body: 

参考代码, 基于Hutool HttpUtil
代码语言:javascript复制
String jsonData = data.stream()
        .map(JSONObject::toJSONString)
        .collect(Collectors.joining("n"));

// 构建批量提交的URL
String batchUrl = DORIS_BASE_URL   "/"   DORIS_DATABASE   "/"   DORIS_TABLE   "/_stream_load/";
HttpRequest request = HttpUtil.createRequest(Method.PUT, batchUrl);
request.header(HttpHeaders.AUTHORIZATION,  getBasicAuthHeader(DORIS_USERNAME, DORIS_PASSWORD));
request.header("label", UUID.randomUUID().toString());
request.header("format", "json");
request.header("Expect", "100-continue");
// 读行
request.header("read_json_by_line","true");

// 获取实际BE结点地址
HttpResponse execute = request.execute();
String header = execute.header(HttpHeaders.LOCATION);
if (!header.isEmpty()){
    request.body(jsonData);
    batchUrl = header;
    request.setUrl(batchUrl);
    HttpResponse insertResponse = request.execute();
    System.out.println(insertResponse);
}

0 人点赞