【Flink】第三十二篇:Flink SQL 字段血缘中树的构建与遍历

2022-03-31 11:25:46 浏览数 (1)

相关推荐:

【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法

【Flink】第二十八篇:Flink SQL 与 Apache Calcite

【Flink】第二十九篇:源码分析 Blink Planner

从【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法 这篇文章开始,笔者开启了一个Flink SQL字段血缘实现的探索之路。但是由于当时只是针对Flink 运行时中产生的calcitetree简单分析后写出的简易版,有诸多不成熟之处,所以也没说过多的实现细节。

经过一段时间的探索,笔者又寻找到了可能是更优的实现方案,这里先将算法部分的细节进行阐述,后续再陆续展开更多实现细节。

在进一步深入探索Flink SQL源码的过程中,笔者发现可以从源码中运行时中提取到这样的数据结构:

1. nodes:

nodes是Flink SQL中AST树的各个节点,每个节点包含两个重要的属性:id、fields

2. edges:

edges是Flink SQL中AST树的各个节点之间的边,包含两个重要属性:source、target。注意这里的source和target即是nodes中的id。

并且,顺着edges将nodes连线后发现这是一颗倒二叉树。例如,一个三个表关联后insert into到第四张表的SQL的倒二叉树结构简化后如下,

每个node都包含向下游node沉淀的fields,而每个边都是有向的说明了source node和target node。

顺着这些分析结论,我们接下来处理它,并最终画出source表到sink表的字段血缘关系。

递归构建树


我们先对前面提到的,从源码中提取的元数据进行分析,得到一些辅助构建和遍历的信息:

得到root node id:即寻找入度为0的节点,

代码语言:javascript复制
public static Integer getRootNodeId(Map<Integer, Integer> edgesMap) {
    List<Integer> rootNodeIds = new LinkedList<>();
    List<Integer> sourceNodeIds = getAllSourceNodeIds(edgesMap);
    List<Integer> targetNodeIds = getAllTargetNodeIds(edgesMap);
    // 寻找入度为0的节点
    for (Integer targetNodeId : targetNodeIds) {
        if (!sourceNodeIds.contains(targetNodeId)) {
            rootNodeIds.add(targetNodeId);
        }
    }
    if (rootNodeIds.size() > 1) {
        throw new RuntimeException("more than one rootNodeId has been found!");
    }
    return rootNodeIds.get(0);
}

接着我们开始由root node自顶向下递归构建这颗二叉树,代码如下

代码语言:javascript复制
public void handleCreateTree(TreeNode<Node> node) {
    if (!addChilds(node)) {
        return;
    } else {
        if (node.getLeftChild() != null) {
            handleCreateTree(node.getLeftChild());
        }
        if (node.getRightChild() != null) {
            handleCreateTree(node.getRightChild());
        }
    }
}

为node添加左右子节点,注意在构建过程中,需要保证左子树的id小于右子树的id,因为关系到后续SQL在为重复字段重命名的规则,

代码语言:javascript复制
public boolean addChilds(TreeNode<Node> parent) {
    boolean optFlag = false;
    Integer parentId = parent.getData().getId();
    for (Map.Entry<Integer, Integer> edge : edgesMap.entrySet()) {
        if (edge.getValue().equals(parentId)) {
            if (parent.getLeftChild() == null) {
                parent.setLeftChild(new TreeNode<>(nodesMap.get(edge.getKey())));
            } else if (parent.getRightChild() == null) {
                // 保证左子树比右子树的id小
                Node left = parent.getLeftChild().getData();
                Node tempNode = nodesMap.get(edge.getKey());
                if (left.getId() < tempNode.getId()) {
                    parent.setRightChild(new TreeNode<>(tempNode));
                } else {
                    parent.setRightChild(parent.getLeftChild());
                    parent.setLeftChild(new TreeNode<>(tempNode));
                }
            } else {
                throw new RuntimeException(" more than two child node has been found in node:"   parent.getData().getId());
            }
            optFlag = true;
        }
    }
    return optFlag;
}

这两部分的操作就从元数据中构建起了一颗类AST树。

递归遍历画血字段缘


接下来我们如何在遍历二叉树的过程中进行字段血缘的分析呢?

由于每个node都有本node的fields,而parent node的fields又来源于左右子节点上浮的fields,换句话说,上游的fields是由左右子node的fields而来,那么我们当然应该选择二叉树后续遍历的总体思路。

例如,下面这个情况下,在左边两个叶子结点的原始表中的fields均为id、name、ts,而上浮过程中由于上游取下了下游左子节点的id,和右子节点的name,但是name重名所以SQL在重复字段后面加"0",即成为name0。

在第二个join时,右子节点的 op_ts 进行了重命名为 ts,这种情况在上浮时依旧要出处理。

综上,代码如下,

代码语言:javascript复制
// 后续遍历
public List<String> postOrder(TreeNode<Node> treeNode) {
    if (treeNode == null) {
        return null;
    } else {
        List<String> leftFields = postOrder(treeNode.getLeftChild());
        List<String> rightFields = postOrder(treeNode.getRightChild());
        return visit(treeNode, leftFields, rightFields);
    }
}

而最重要的visit过程又是如何将子节点的fields上浮的呢?

代码语言:javascript复制
public List<String> visit(TreeNode<Node> node, List<String> leftFields, List<String> rightFields) {
    List<String> fields;
    if (leftFields == null && rightFields == null) {
        fields = visitLeafNode(node);
    } else if (leftFields != null && rightFields != null) {
        fields = visitCrossNode(node, leftFields, rightFields);
    } else {
        if (leftFields != null) {
            fields = visitMiddleNode(node, leftFields);
        } else {
            fields = visitMiddleNode(node, rightFields);
            // 理论上不应该出现这种情况!
            System.out.println("A node with only the right subtree was found:"   node.getData().getId());
        }
    }
    return fields;
}

这里笔者将节点的遍历分为了三种情况:叶子节点;单子树的中间节点;双子树的交叉节点。

理由是:叶子结点除了附带fields信息,还有catalog中这张source表的一些元信息,例如

在遍历具体每种node过程中,重要的操作是:将本层的fields向上浮动,并进行一些命名的特殊处理,例如之前所述的重复字段命名处理,AS字段重命处理等。三种类型的节点的主要遍历思想如下,

代码语言:javascript复制
public List<String> visitLeafNode(TreeNode<Node> leafNode) {
    List<String> fields = new LinkedList<>();
    for (String field : leafNode.getData().getFields()) {
        fields.add(leafNode.getData().getIdentifier()   SPLITER   field);
    }
    return fields;
}
代码语言:javascript复制
public List<String> visitMiddleNode(TreeNode<Node> node, List<String> childFields) {
    List<String> fields = new LinkedList<>();
    for (String afterAsField : node.getData().getFields()) {
        String beforeAsField = parseAsField(afterAsField, node.getData().getDescription());
        String searchField = searchField(beforeAsField, childFields, afterAsField);
        if (searchField != null) {
            fields.add(searchField);
        }
    }
    return fields;
}
代码语言:javascript复制
public List<String> visitCrossNode(TreeNode<Node> node, List<String> smallChildIdFields, List<String> bigChildIdFields) {
    List<String> newBigChildIdFields = handleDuplicateName(smallChildIdFields, bigChildIdFields);
    List<String> fields = new LinkedList<>();
    for (String afterAsField : node.getData().getFields()) {
        String beforeAsField = parseAsField(afterAsField, node.getData().getDescription());
        String searchSmallChildIdField = searchField(beforeAsField, smallChildIdFields, afterAsField);
        String searchNewBigChildIdField = searchField(beforeAsField, newBigChildIdFields, afterAsField);
        if (searchSmallChildIdField != null && searchNewBigChildIdField == null) {
            fields.add(searchSmallChildIdField);
        } else if (searchNewBigChildIdField != null && searchSmallChildIdField == null) {
            fields.add(searchNewBigChildIdField);
        } else if (searchSmallChildIdField != null && searchNewBigChildIdField != null) {
            throw new RuntimeException(" match field both in left child and right child:"   node.getData().getId());
        } else {
            throw new RuntimeException(" match field faild:"   node.getData().getId());
        }
    }
    return fields;
}

综合以上,我们来看看最终程序的血缘分析运行结果:

测试用例:

代码语言:javascript复制
String insertSQL3 = " insert into sinkT "
          " select leftT.id pk, rightT.name, rightT0.ts `time` "
          "from leftT left join rightT on leftT.id = rightT.id left join rightT0 on leftT.id = rightT0.id ";

除此之外,笔者还想总结一下递归的思想。

递归

To iterate is human,to recurse divine.

迭代的是人,递归的是神。

——L. Peter Deutsch

虽然这句话听上去可能有几分夸大的成分,但是足以说明递归不好理解。

那为什么会出现这种情况呢?这似乎和人的思维方式有关系。我们人类的思维天生就不适合递归!

我将递归类比做我们反省自身的过程,古人云:"吾日三省吾身"。可见反省自己的难能可贵之处。而这仅仅是进行了一层的递归,更别说反省自己的反省了~~~~

例如,在认知学中,我们经常提到元认知,这其实也是在进行一种递归反省自己的思维方式。

这个角度上看来人类在递归思维上本身就先天不足,那么我们就熟能生巧!

经过最近这段时间探索血缘中接触到的AST递归处理,总结了如下几点递归的思维技巧:

递归非常类似于我们在高数学中就接触的求数列递推公式:an = f(an-1)

我们当时在求递推公式中有一种方法叫做归纳法证明,大体思路就是先计算a1=?,再由递推公式an = f(an-1)进行一般性证明。

这种思想和递归思维非常相似,我们举例说明。

例如,斐波那契,递推定义:F(0)=0,F(1)=1, F(n)=F(n - 1) F(n - 2),那么,直接可以写出:

代码语言:javascript复制
int fun(int i){
    if(i<=1){
        return i;
    }else{
        return fun(i-1)   fun(i-2)
    }
}

但是对于递推公式没有直接给出的,我们就要用归纳法来发现地推公式了。

例如,求二叉树的树高,

只有1个节点:a(1) = 0,

只有2个节点,并且呈上下层关系:a(2) = a(1) 1

上图的情况:

a3 = max{a2, a2'} 1

所以,

代码语言:javascript复制
a(n) = max{a(left), a(right)}   1
  1. 求a(1)
  2. 用a(1)求a(2)
  3. 用a(2)求a(3),并总结a(2)与a(3)的关系
  4. 总结一般性关系:a(n) = F(a(n-1))

0 人点赞