Flink 自定义SQL实现Hudi MOR表压缩

2022-09-16 14:36:43 浏览数 (1)

Hudi在构建流式数据湖方面具有领先地位。Flink作为真正的流处理引擎,与Hudi搭配是理所应当的事情了。但是目前Hudi MOR表压缩功能除了在线压缩以外,并不能通过SQL实现手动压缩。目前的实现方式为:

代码语言:javascript复制
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.15-bundle_2.12-x.x.x.jar --path hdfs://xxx:9000/table

本文介绍如何扩展Flink引擎的SQL解析能力,使其具备直接使用SQL实现Hudi MOR表压缩的功能。修改后,通过下述Flink SQL即可实现Hudi MOR表压缩,(procedure call 参考Spark的call语法):

代码语言:javascript复制
procedure call compact a with ('path'='hdfs://bigdata:9000/tmp/t1_20220810_6', 'schedule'='false');

本文基于Flink 1.15分支修改:https://github.com/apache/flink/tree/release-1.15

修改后的代码地址为:https://git.lrting.top/xiaozhch5/flink/-/tree/release-1.15-compile

修改SQL Parser

我们知道Flink 基于calcite解析SQL,所以根据Flink原有写法扩展即可。具体来说,修改地方如下。

新增SqlCallCompactTable类,类内容如下:

代码语言:javascript复制
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.sql.parser.ddl;

import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nonnull;

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
 * Abstract class to call compact table like PROCEDURE CALL COMPACT [[catalogName.]
 * dataBasesName].tableName
 */
public class SqlCallCompactTable extends SqlCall {

    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("PROCEDURE CALL COMPACT", SqlKind.OTHER_DDL);

    protected final SqlIdentifier tableIdentifier;

    protected final SqlNodeList propertyList;

    public SqlCallCompactTable(
            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
        super(pos);
        this.tableIdentifier = requireNonNull(tableName, "tableName should not be null");
        this.propertyList = requireNonNull(propertyList, "with property should no be null");
    }

    @Nonnull
    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    public SqlIdentifier getTableName() {
        return tableIdentifier;
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("PROCEDURE CALL");
        writer.keyword("COMPACT");
        tableIdentifier.unparse(writer, leftPrec, rightPrec);
        if (this.propertyList.size() > 0) {
            writer.keyword("WITH");
            SqlWriter.Frame withFrame = writer.startList("(", ")");
            for (SqlNode property : propertyList) {
                printIndent(writer);
                property.unparse(writer, leftPrec, rightPrec);
            }
            writer.newlineAndIndent();
            writer.endList(withFrame);
        }
    }

    protected void printIndent(SqlWriter writer) {
        writer.sep(",", false);
        writer.newlineAndIndent();
        writer.print("  ");
    }

    public SqlNodeList getPropertyList() {
        return propertyList;
    }

    @Nonnull
    @Override
    public List<SqlNode> getOperandList() {
        return ImmutableNullableList.of(tableIdentifier);
    }
}

修改Parser.tdd

在imports中新增:

代码语言:javascript复制
    "org.apache.flink.sql.parser.ddl.SqlCallCompactTable"

在statementParserMethods中新增:

代码语言:javascript复制
    "SqlCallCompactTable()"

在parserImpls.ftl中新增:

代码语言:javascript复制
SqlCallCompactTable SqlCallCompactTable() :
{
SqlParserPos pos;
SqlIdentifier tableIdentifier;
SqlNodeList propertyList = SqlNodeList.EMPTY;
}
{
<PROCEDURE> <CALL> <COMPACT>   { pos = getPos(); }
         tableIdentifier = CompoundIdentifier()
            <WITH>
                propertyList = TableProperties()
            {
            return new SqlCallCompactTable(
                pos,
                tableIdentifier,
                propertyList);
            }
}

新增CallCompactOperation

在flink-table-api-java项目中org/apache/flink/table/operations/目录下新增CallCompactOperation类,具体类的内容如下:

代码语言:javascript复制
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.operations;

import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;

/** Call Compact Operation */
public class CallCompactOperation implements ModifyOperation {

    private final ObjectIdentifier tableIdentifier;

    private final QueryOperation child;

    private final String basePath;

    private final String schedule;

    public CallCompactOperation(
            ObjectIdentifier tableIdentifier,
            QueryOperation child,
            String basePath,
            String schedule) {
        this.tableIdentifier = tableIdentifier;
        this.child = child;
        this.basePath = basePath;
        this.schedule = schedule;
    }

    public ObjectIdentifier getTableIdentifier() {
        return tableIdentifier;
    }

    @Override
    public QueryOperation getChild() {
        return this.child;
    }

    public String getBasePath() {
        return this.basePath;
    }

    public String getSchedule() {
        return schedule;
    }

    @Override
    public <T> T accept(ModifyOperationVisitor<T> modifyOperationVisitor) {
        return null;
    }

    @Override
    public String asSummaryString() {
        return "PROCEDURE CALL COMPACT TABLE: "
                  this.tableIdentifier.getCatalogName()
                  "."
                  this.tableIdentifier.getDatabaseName()
                  "."
                  this.tableIdentifier.getObjectName();
    }
}

修改table planner

修改SqlToOperationConverter类新增convertCallCompact方法:

代码语言:javascript复制
    private Operation convertCallCompact(SqlCallCompactTable sqlCallCompactTable) {
        String catalogName = this.catalogManager.getCurrentCatalog();
        String databaseName = this.catalogManager.getCurrentDatabase();
        SqlNodeList sqlNodeList = sqlCallCompactTable.getPropertyList();
        List<SqlNode> sqlNodes = sqlNodeList.getList();
        String basePath = "";
        String schedule = "false";
        for (SqlNode sqlNode : sqlNodes) {
            if ("path".equals(((SqlTableOption) sqlNode).getKeyString())) {
                basePath = ((SqlTableOption) sqlNode).getValueString();
            } else if ("schedule".equals(((SqlTableOption) sqlNode).getKeyString())) {
                schedule = ((SqlTableOption) sqlNode).getValueString();
            }
        }
        return new CallCompactOperation(
                ObjectIdentifier.of(
                        catalogName, databaseName, sqlCallCompactTable.getTableName().toString()),
                null,
                basePath,
                schedule);
    }

修改convertValidatedSqlNode方法,新增如下判断:

代码语言:javascript复制
        if (validated instanceof SqlCallCompactTable) {
            return Optional.of(converter.convertCallCompact((SqlCallCompactTable) validated));
        }

具体截图如下:

修改TableEnvironmentImpl

864行,修改executeInternal方法,新增如下判断条件:

代码语言:javascript复制
            if (operation instanceof CallCompactOperation) {
                try {
                    String catalog = catalogManager.getCurrentCatalog();
                    String database = catalogManager.getCurrentDatabase();
                    String table =
                            ((CallCompactOperation) operation).getTableIdentifier().getObjectName();
                    String basePath = ((CallCompactOperation) operation).getBasePath();
                    String schedule = ((CallCompactOperation) operation).getSchedule();
                    System.out.println("Call Compact on "   catalog   "."   database   "."   table);
                    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
                    cfg.path = basePath;
                    cfg.schedule = "true".equals(schedule);
                    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
                    StreamExecutionEnvironment env =
                            StreamExecutionEnvironment.getExecutionEnvironment();
                    HoodieFlinkCompactor.AsyncCompactionService service =
                            new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
                    new HoodieFlinkCompactor(service).start(cfg.serviceMode);
                } catch (Exception e) {
                    throw new ValidationException("Call Compact failed", e);
                }
                return TableResultImpl.TABLE_RESULT_OK;
            }

具体截图如下:

新增hudi-flink包

修改flink-table-api-java项目pom文件,新增hudi-flink包:

代码语言:javascript复制
    <dependency>
      <groupId>org.apache.hudi</groupId>
      <artifactId>hudi-flink</artifactId>
      <version>${hudi.version)</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.flink</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
      <scope>provided</scope>
    </dependency>

Flink打包

将Flink重新打包之后,即可解析PROCEDURE CALL COMPACT语法。

示例

将打包好后的Flink放到测试环境,执行如下SQL,其中path表示表路径,schedule表示是否生成压缩计划,如果不指定schedule的话,默认为false。

代码语言:javascript复制
procedure call compact a with ('path'='hdfs://bigdata:9000/tmp/t1_20220810_6', 'schedule'='false');

对已经生成压缩计划的hudi mor表执行压缩任务,我们不指定schedule参数,可以看到任务运行成功,并执行了压缩任务:

SQL提交脚本

https://git.lrting.top/xiaozhch5/flink-sql-submit

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2109308

0 人点赞