使用Calcite解析Sql做维表关联(一)

2022-04-18 13:20:33 浏览数 (1)

维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐、规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算直接通过ETL方式加载到Hive表中,然后通过sql方式关联查询即可,但是对于实时计算中Flink、SparkStreaming的表都是抽象的、虚拟的表,那么就没法使用加载方式完成。透过维表服务系列里面讲到的维表关联都是使用编码方式完成,使用Map或者AsyncIO方式完成,但是这种硬编码方式开发效率很低,特别是在实时数仓里面,我们希望能够使用跟离线一样sql方式完成维表关联操作。

在Flink1.9中提供了使用sql化方式完成维表关联,只需要实现LookupableTableSource接口即可,可以实现同步或者异步关联。在1.9之前就需要自己实现sql语法解析,然后在转换为API方式,对上层提供sql语法。看一个sql语句:

代码语言:javascript复制
select * from orders o join gdsInfo g on o.gdsId=g.gdsId

orders表示流表,gdsInfo 表示维表。根据sql解析顺序先 from 部分、然后where 部分、最后select,那么对于join 方式,相当于join生成了一张临时表,然后去select 这张临时表,因此可以确认

sql解析流程:

1. 识别出流表与维表 2. 解析join部分,生成临时表

3. select 临时表

现在使用calcite解析这条语句

代码语言:javascript复制
public class ParseDemo {

    public static void main(String[] args) {
        //假设gdsInfo就是维表
        String sql = "select * from orders o join gdsInfo g on o.gdsId=g.gdsId";

        SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL).build();
        SqlParser sqlParser = SqlParser.create(sql, config);
        SqlSelect sqlSelect = null;
        try {
            sqlSelect = (SqlSelect) sqlParser.parseStmt();
        } catch (Exception e) {
            e.printStackTrace();
        }

        SqlNode sqlFrom = sqlSelect.getFrom();
        boolean isSideJoin = false;
        String leftTable = "";
        String rightTable = "";
        String newName = ""; //临时表
        SqlJoin sqlJoin = null;
        
        //解析join
        if (sqlFrom.getKind() == SqlKind.JOIN) {
            sqlJoin = (SqlJoin) sqlFrom;
            SqlNode left = sqlJoin.getLeft();
            SqlNode right = sqlJoin.getRight();
            isSideJoin = true;
            leftTable = paserTableName(left);
            rightTable = paserTableName(right);
        }
        
        //生成新的select
        if (isSideJoin) {
            newName = leftTable   "_"   rightTable;
            SqlParserPos pos = new SqlParserPos(0, 0);
            SqlIdentifier sqlIdentifier = new SqlIdentifier(newName, pos);
            sqlSelect.setFrom(sqlIdentifier);
        }
    }
   
   //解析表
    private static String paserTableName(SqlNode tbl) {
        if (tbl.getKind() == SqlKind.AS) {
            SqlBasicCall sqlBasicCall = (SqlBasicCall) tbl;
            return sqlBasicCall.operands[1].toString();
        }
        return ((SqlIdentifier) tbl).toString();
    }
}

那么我们需要的就是生成新的select节点与SqlJoin节点,执行逻辑就是根据SqlJoin节点做维表关联之后生成新的表,然后去select这样新的表。

sql解析部分已经完成,既然使用sql化方式,因此也需要定义源表与维表,数据源一般是kafka, 定义源表需要:表名称、字段名称、字段类型、数据格式、topic;维表假设为mysql,需要定义:表名称、字段类型、字段名称、关联方式(同步/异步)、缓存方式(LRU/全部缓存、无缓存)。

源表定义:

代码语言:javascript复制
CREATE TABLE orders(
    orderId varchar,
    gdsId varchar,
    orderTime varchar
 )WITH(
    type = 'kafka',
    kafka.bootstrap.servers = 'localhost:9092',
    kafka.topic = 'topic1',
    kafka.group.id = 'gId1',
    sourcedatatype ='json'
 );

维表定义:

代码语言:javascript复制
CREATE TABLE gdsInfo(
    gdsId varchar,
    gdsName varchar,
    price double
 )WITH(
    type='mysql',
    url='jdbc:mysql://localhost:3306/paul',
    userName='root',
    password='123456',
    tableName='gdsInfo',
    cache = 'LRU',
    isSideTable='true'
    );

现在就是要如何解析这些语句,正则表达式是首选,需要解析出表名称、字段、属性三个部分:creat table xxx (xxx) with(xxx);正则表达式可为:

代码语言:javascript复制
(?i)creates tables (S )s*((. ))s*withs*((. ))

?i表示后面的匹配忽略大小写,s 表示匹配多个空格,S 表示匹配多个字符,. 表示匹配任意字符。

定义一个table类:

代码语言:javascript复制
class TableInfo{
    private String tableName; // 表名称
    private Map<String,String> fieldsInfo; //字段名称->类型
    private Properties props; //表属性
    private boolean isSideTable; //是否为维表
    }

解析:

代码语言:javascript复制
public class ParseCreate {

    public static final String REG_CREATE="(?i)create\s table\s (\S )\s*\((. )\)\s*with\s*\((. )\)";

    public static void main(String[] args) {

        String createSql="CREATE TABLE orders("   "    orderId varchar,"   "    gdsId varchar,"
                  "    orderTime varchar"   " )WITH("   "    type = 'kafka',"
                  "    kafka.bootstrap.servers = 'localhost:9092',"   "    kafka.topic = 'topic1',"
                  "    kafka.group.id = 'gId1',"   "    sourcedatatype ='json'"   " );";
        Pattern pattern=Pattern.compile(REG_CREATE);

        TableInfo tableInfo=new TableInfo();
        Matcher matcher=pattern.matcher(createSql);
        if(matcher.find()){
            tableInfo.setTableName(matcher.group(1));
            String fieldsStr=matcher.group(2);
            String propsStr=matcher.group(3);
            tableInfo.setFieldsInfo(parseFiles(fieldsStr));
            tableInfo.setProps(parseProps(propsStr));
            if(Boolean.valueOf(tableInfo.getProps().getProperty("isSideTable","false"))){
                tableInfo.setSideTable(true);
            }
        }

    }

    public static Map<String,String> parseFiles(String fieldsStr){
        Map<String,String> fieldsInfo=new HashMap<>();
        String[] fieldsArray=fieldsStr.split(",");
        for(String field: fieldsArray){
           String[] fieldInfo=field.trim().split(" ");
           fieldsInfo.put(fieldInfo[0],fieldInfo[1]);
        }
        return fieldsInfo;
    }

    public static Properties parseProps(String propsStr){
        Properties props=new Properties();
        String[] propsArray=propsStr.split(",");
        for(String prop: propsArray){
            String[] propInfo=prop.trim().split("=");
            props.setProperty(propInfo[0],propInfo[1]);
        }
        return props;
    }

}

至此完成了简易的create语句解析,下一篇将介绍如何将解析后的create与维表关联转换为可执行代码。

—END—

0 人点赞