相关推荐:
【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法
【Flink】第二十八篇:Flink SQL 与 Apache Calcite
【Flink】第二十九篇:源码分析 Blink Planner
从【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法 这篇文章开始,笔者开启了一个Flink SQL字段血缘实现的探索之路。但是由于当时只是针对Flink 运行时中产生的calcitetree简单分析后写出的简易版,有诸多不成熟之处,所以也没说过多的实现细节。
经过一段时间的探索,笔者又寻找到了可能是更优的实现方案,这里先将算法部分的细节进行阐述,后续再陆续展开更多实现细节。
在进一步深入探索Flink SQL源码的过程中,笔者发现可以从源码中运行时中提取到这样的数据结构:
1. nodes:
nodes是Flink SQL中AST树的各个节点,每个节点包含两个重要的属性:id、fields
2. edges:
edges是Flink SQL中AST树的各个节点之间的边,包含两个重要属性:source、target。注意这里的source和target即是nodes中的id。
并且,顺着edges将nodes连线后发现这是一颗倒二叉树。例如,一个三个表关联后insert into到第四张表的SQL的倒二叉树结构简化后如下,
每个node都包含向下游node沉淀的fields,而每个边都是有向的说明了source node和target node。
顺着这些分析结论,我们接下来处理它,并最终画出source表到sink表的字段血缘关系。
递归构建树
我们先对前面提到的,从源码中提取的元数据进行分析,得到一些辅助构建和遍历的信息:
得到root node id:即寻找入度为0的节点,
代码语言:javascript复制public static Integer getRootNodeId(Map<Integer, Integer> edgesMap) {
List<Integer> rootNodeIds = new LinkedList<>();
List<Integer> sourceNodeIds = getAllSourceNodeIds(edgesMap);
List<Integer> targetNodeIds = getAllTargetNodeIds(edgesMap);
// 寻找入度为0的节点
for (Integer targetNodeId : targetNodeIds) {
if (!sourceNodeIds.contains(targetNodeId)) {
rootNodeIds.add(targetNodeId);
}
}
if (rootNodeIds.size() > 1) {
throw new RuntimeException("more than one rootNodeId has been found!");
}
return rootNodeIds.get(0);
}
接着我们开始由root node自顶向下递归构建这颗二叉树,代码如下
代码语言:javascript复制public void handleCreateTree(TreeNode<Node> node) {
if (!addChilds(node)) {
return;
} else {
if (node.getLeftChild() != null) {
handleCreateTree(node.getLeftChild());
}
if (node.getRightChild() != null) {
handleCreateTree(node.getRightChild());
}
}
}
为node添加左右子节点,注意在构建过程中,需要保证左子树的id小于右子树的id,因为关系到后续SQL在为重复字段重命名的规则,
代码语言:javascript复制public boolean addChilds(TreeNode<Node> parent) {
boolean optFlag = false;
Integer parentId = parent.getData().getId();
for (Map.Entry<Integer, Integer> edge : edgesMap.entrySet()) {
if (edge.getValue().equals(parentId)) {
if (parent.getLeftChild() == null) {
parent.setLeftChild(new TreeNode<>(nodesMap.get(edge.getKey())));
} else if (parent.getRightChild() == null) {
// 保证左子树比右子树的id小
Node left = parent.getLeftChild().getData();
Node tempNode = nodesMap.get(edge.getKey());
if (left.getId() < tempNode.getId()) {
parent.setRightChild(new TreeNode<>(tempNode));
} else {
parent.setRightChild(parent.getLeftChild());
parent.setLeftChild(new TreeNode<>(tempNode));
}
} else {
throw new RuntimeException(" more than two child node has been found in node:" parent.getData().getId());
}
optFlag = true;
}
}
return optFlag;
}
这两部分的操作就从元数据中构建起了一颗类AST树。
递归遍历画血字段缘
接下来我们如何在遍历二叉树的过程中进行字段血缘的分析呢?
由于每个node都有本node的fields,而parent node的fields又来源于左右子节点上浮的fields,换句话说,上游的fields是由左右子node的fields而来,那么我们当然应该选择二叉树后续遍历的总体思路。
例如,下面这个情况下,在左边两个叶子结点的原始表中的fields均为id、name、ts,而上浮过程中由于上游取下了下游左子节点的id,和右子节点的name,但是name重名所以SQL在重复字段后面加"0",即成为name0。
在第二个join时,右子节点的 op_ts 进行了重命名为 ts,这种情况在上浮时依旧要出处理。
综上,代码如下,
代码语言:javascript复制// 后续遍历
public List<String> postOrder(TreeNode<Node> treeNode) {
if (treeNode == null) {
return null;
} else {
List<String> leftFields = postOrder(treeNode.getLeftChild());
List<String> rightFields = postOrder(treeNode.getRightChild());
return visit(treeNode, leftFields, rightFields);
}
}
而最重要的visit过程又是如何将子节点的fields上浮的呢?
代码语言:javascript复制public List<String> visit(TreeNode<Node> node, List<String> leftFields, List<String> rightFields) {
List<String> fields;
if (leftFields == null && rightFields == null) {
fields = visitLeafNode(node);
} else if (leftFields != null && rightFields != null) {
fields = visitCrossNode(node, leftFields, rightFields);
} else {
if (leftFields != null) {
fields = visitMiddleNode(node, leftFields);
} else {
fields = visitMiddleNode(node, rightFields);
// 理论上不应该出现这种情况!
System.out.println("A node with only the right subtree was found:" node.getData().getId());
}
}
return fields;
}
这里笔者将节点的遍历分为了三种情况:叶子节点;单子树的中间节点;双子树的交叉节点。
理由是:叶子结点除了附带fields信息,还有catalog中这张source表的一些元信息,例如
在遍历具体每种node过程中,重要的操作是:将本层的fields向上浮动,并进行一些命名的特殊处理,例如之前所述的重复字段命名处理,AS字段重命处理等。三种类型的节点的主要遍历思想如下,
代码语言:javascript复制public List<String> visitLeafNode(TreeNode<Node> leafNode) {
List<String> fields = new LinkedList<>();
for (String field : leafNode.getData().getFields()) {
fields.add(leafNode.getData().getIdentifier() SPLITER field);
}
return fields;
}
代码语言:javascript复制public List<String> visitMiddleNode(TreeNode<Node> node, List<String> childFields) {
List<String> fields = new LinkedList<>();
for (String afterAsField : node.getData().getFields()) {
String beforeAsField = parseAsField(afterAsField, node.getData().getDescription());
String searchField = searchField(beforeAsField, childFields, afterAsField);
if (searchField != null) {
fields.add(searchField);
}
}
return fields;
}
代码语言:javascript复制public List<String> visitCrossNode(TreeNode<Node> node, List<String> smallChildIdFields, List<String> bigChildIdFields) {
List<String> newBigChildIdFields = handleDuplicateName(smallChildIdFields, bigChildIdFields);
List<String> fields = new LinkedList<>();
for (String afterAsField : node.getData().getFields()) {
String beforeAsField = parseAsField(afterAsField, node.getData().getDescription());
String searchSmallChildIdField = searchField(beforeAsField, smallChildIdFields, afterAsField);
String searchNewBigChildIdField = searchField(beforeAsField, newBigChildIdFields, afterAsField);
if (searchSmallChildIdField != null && searchNewBigChildIdField == null) {
fields.add(searchSmallChildIdField);
} else if (searchNewBigChildIdField != null && searchSmallChildIdField == null) {
fields.add(searchNewBigChildIdField);
} else if (searchSmallChildIdField != null && searchNewBigChildIdField != null) {
throw new RuntimeException(" match field both in left child and right child:" node.getData().getId());
} else {
throw new RuntimeException(" match field faild:" node.getData().getId());
}
}
return fields;
}
综合以上,我们来看看最终程序的血缘分析运行结果:
测试用例:
代码语言:javascript复制String insertSQL3 = " insert into sinkT "
" select leftT.id pk, rightT.name, rightT0.ts `time` "
"from leftT left join rightT on leftT.id = rightT.id left join rightT0 on leftT.id = rightT0.id ";
除此之外,笔者还想总结一下递归的思想。
递归
To iterate is human,to recurse divine.
迭代的是人,递归的是神。
——L. Peter Deutsch
虽然这句话听上去可能有几分夸大的成分,但是足以说明递归不好理解。
那为什么会出现这种情况呢?这似乎和人的思维方式有关系。我们人类的思维天生就不适合递归!
我将递归类比做我们反省自身的过程,古人云:"吾日三省吾身"。可见反省自己的难能可贵之处。而这仅仅是进行了一层的递归,更别说反省自己的反省了~~~~
例如,在认知学中,我们经常提到元认知,这其实也是在进行一种递归反省自己的思维方式。
这个角度上看来人类在递归思维上本身就先天不足,那么我们就熟能生巧!
经过最近这段时间探索血缘中接触到的AST递归处理,总结了如下几点递归的思维技巧:
递归非常类似于我们在高数学中就接触的求数列递推公式:an = f(an-1)
我们当时在求递推公式中有一种方法叫做归纳法证明,大体思路就是先计算a1=?,再由递推公式an = f(an-1)进行一般性证明。
这种思想和递归思维非常相似,我们举例说明。
例如,斐波那契,递推定义:F(0)=0,F(1)=1, F(n)=F(n - 1) F(n - 2),那么,直接可以写出:
代码语言:javascript复制int fun(int i){
if(i<=1){
return i;
}else{
return fun(i-1) fun(i-2)
}
}
但是对于递推公式没有直接给出的,我们就要用归纳法来发现地推公式了。
例如,求二叉树的树高,
只有1个节点:a(1) = 0,
只有2个节点,并且呈上下层关系:a(2) = a(1) 1
上图的情况:
a3 = max{a2, a2'} 1
所以,
代码语言:javascript复制a(n) = max{a(left), a(right)} 1
- 求a(1)
- 用a(1)求a(2)
- 用a(2)求a(3),并总结a(2)与a(3)的关系
- 总结一般性关系:a(n) = F(a(n-1))