★
感谢您的关注 点赞 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! ”
1.序篇-先说结论
宝贝们,还记得前几天博主去的火山引擎大数据场嘛,其中比较令大家感兴趣的就是最后一讲,字节一站式埋点平台的 flink 标准化清洗及拆流任务。
其中大家感觉比较流啤的就是的就是字节做到了:
- 不重启任务可以上下线新的拆流及清洗规则,所有的规则变更都不需要涉及到任务的重启。
- 清洗 udf,rpc 接口热加载
总的来说就是任务永不停,不可能停止的,好么,beiber。
★字节火山引擎 PPT。公众号回复 20210724 获取。 ”
6
★本文博主就主要介绍第一点,即做到规则动态变化,可以做到动态添加一个 sink kafka topic,动态删除一个 sink kafka topic,而不重启任务。相信能抛砖引玉,给大家一些启发。 ”
本文从以下几个章节详细介绍框架实现:
- 背景篇-为啥需要这么个框架
- 定义、目标篇-做这个框架的目标、预期效果是什么
- 难点剖析篇-此框架建设的难点、业界目前的实现
- 数据建设篇-框架具体方案设计
- 数据保障篇-框架的保障方案
- 总结与展望篇
2.背景篇-为啥需要这么个框架
首先来看看字节他们做这件事情的背景:
- 任务重启造成数据的延迟:对于字节这种企业来说且每天都会新上线很多的埋点,把这些新的埋点拆流条件加入 flink 任务就要重启,但是字节客户端日志流量都是千万级别 qps 的,就意味着这个 flink 任务一旦重启耗时肯定是很长的,这对时延敏感的业务是不可接受的。
- 减少对于原始客户端日志的烟囱式消费,节约资源
- 统一标准化的埋点平台:用户能通过埋点平台用到正确的数据
- 与埋点平台联动的、统一化的、标准化的流式数据处理平台:用户能通过这个平台去获取想要的统一标准化的数据
- 数据的分级保障能力:Dump 日志,日志的产出需要分优先级进行保障(死保、尽力保...),用户能放心的用数据
如图:
因此诞生了这个框架。
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
3.定义、目标篇-做这个框架的目标、预期效果是什么
上述的痛点很多,本节就从最痛的任务重启的延迟角度出发解决问题,揭秘字节动态配置化的 flink 任务的实现。
预期效果如下:
1.即在任务不停止的情况下可以动态的上线一个动态规则、一个 sink kafka topic,上线某个、某类埋点对应的流数据的 kafka topic
如图左边是修改配置,添加了一个拆流规则以及对应 topic,右边这个规则 topic 就开始产出数据,对应的 console consumer 就消费到了复合规则的数据。(gif 加载可能比较慢)
8
2.即在任务不停止的情况下可以动态的下线一个动态规则、一个 sink kafka topic,下线某个、某类埋点对应的流数据的 kafka topic
如图左边是修改配置,删除了一个拆流规则以及对应 topic,右边这个规则 topic 就不产出数据了,对应的 console consumer 就没有新数据可以消费了。(gif 加载可能比较慢)
9
3.总体效果如下:
4.难点剖析篇-此框架建设的难点、业界目前的实现
首先带大家分析下,实现这个框架,最基本的模块都需要包含什么:
- flink 任务:本身就是一个 Map 任务,逻辑简单
- 动态上下线规则配置:肯定得有一个动态配置中心去告诉 flink 任务需要新上下线一个 kafka topic
- 动态规则过滤引擎:flink 任务监听到规则发生动态变化之后,要热更新规则,将新的规律规则应用起来。需要一个动态代码执行引擎
- 动态上下线 Kafka topic:目前大多数公司用的是 flink 自带的 kafka-connector,一旦涉及到需要添加一个下游,就需要添加一个 kafka producer operator,因为涉及到多加了一个 operator,那肯定得重启任务。需要动态添加删除 producer 的能力。
5.数据建设篇-框架具体方案设计
5.1.方案设计
5.1.1.方案
先说说方案选择的结论:
- flink 入口任务:Map 模型使用 ProcessFunction 底层算子
- 动态上下线规则配置:配置中心开源的有很多,这里为了实现轻量化,实现简单,使用 zookeeper 作为动态规则配置中心。当然如果对 zk 压力大,也可以使用广播配置实现。
- 动态规则引擎:规则引擎很多,比如常见的可以使用 JavaScript、Groovy、jython、mvel2、freemarker 等等,太多了。考虑到性能、易用性选用 janino 将动态规则动态编译出 class。然后作为动态规则引擎使用。后面会详述选用 janino 的原因。
- 动态上下线 Kafka topic:去除 flink-kafka-connector,直接在 ProcessFunction 中使用原生 kafka-clients 输出数据,维护一个 producer 池。
整体方案架构图如图所示:
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
5.1.2.预期效果
5.1.2.1.上线配置
4
5.1.2.2.下线配置
5
5.2.具体实现
整个任务的实现非常简单。
本地运行,可以参考下面两篇安装 zk 和 kafka。
- zk:https://www.jianshu.com/p/5491d16e6abd
- kafka:https://www.jianshu.com/p/dd2578d47ff6
5.2.1.flink 任务入口逻辑
首先来看看整个任务的入口逻辑,ProcessFunction 的功能很简单:
- 针对数据源的每一条日志数据,遍历动态规则引擎池
- 只要这条数据满足某一条规则的条件,就将这条日志数据写出到规则对应的 topic 中
env.addSource(new UserDefinedSource())
.process(new ProcessFunction<ClientLogSource, ClientLogSink>() {
// 动态规则配置中心
private ZkBasedConfigCenter zkBasedConfigCenter;
// kafka producer 管理中心
private KafkaProducerCenter kafkaProducerCenter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance();
this.kafkaProducerCenter = KafkaProducerCenter.getInstance();
}
@Override
public void processElement(ClientLogSource clientLogSource, Context context, Collector<ClientLogSink> collector)
throws Exception {
// 遍历所有的动态规则
this.zkBasedConfigCenter.getMap().forEach(new BiConsumer<Long, DynamicProducerRule>() {
@Override
public void accept(Long id, DynamicProducerRule dynamicProducerRule) {
// 验证该条数据是否符合该条规则
if (dynamicProducerRule.eval(clientLogSource)) {
// 将符合规则的数据发向对应规则的 topic 中
kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString());
}
}
});
}
@Override
public void close() throws Exception {
super.close();
// 关闭规则池
this.zkBasedConfigCenter.close();
// 关闭 producer 池
this.kafkaProducerCenter.close();
}
});
env.execute();
5.2.2.动态上下线规则配置
来看 flink ProcessFunction 中的核心点,第一部分就是 ZkBasedConfigCenter。其功能包含:
- 任务启动时,初始化加载 zk 配置,初始化规则池,将规则池中的配置规则编译成 class 可执行规则
- 监听 zk 配置变更,将新增配置加入规则池,将下线配置从规则池删除
5.2.2.1.动态规则 schema 设计
动态规则包含的内容与用户需求息息相关:
举例:用户需要将在首页上报 id > 300 用户的客户端日志都写入 topic_id_bigger_than_300_and_main_page 的 kafka topic 中。
那么针对这个 flink 任务来说就有以下三项用户的输入:
- 动态规则的过滤条件:即上游每一条数据过来之后检验这条数据是否满足规则条件。上面这个例子的条件就是
clientLogSource.getId() > 300 && clientLogSource.getPage().equals("首页")
;其中 clientLogSource 是原始日志 model - 动态规则要写入的 topic 名称:这条规则过滤出来的数据要写入哪个 topic。上面这个例子的 topic 就是
topic_id_bigger_than_300_and_main_page
- 动态规则的唯一 id:唯一标识一个过滤规则的 id
针对上述要求设计动态规则配置的 schema 如下:
代码语言:javascript复制{
"id-数值类型 string": {
"condition-过滤条件": "1==1",
"targetTopic-目标 topic 名称": "tuzisir1"
}
"1": {
"condition": "clientLogSource.getId() > 300 && clientLogSource.getPage().equals("首页")",
"targetTopic": "topic_id_bigger_than_300_and_main_page"
},
"2": {
"condition": "clientLogSource.getPage().equals("个人主页")",
"targetTopic": "topic_profile_page"
}
}
对应动态规则 model 设计如下:
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
代码语言:javascript复制public class DynamicProducerRule implements Evaluable {
// 具体过滤规则
private String condition;
// 具体写入 topic
private String targetTopic;
// 使用 janino 编译的规则过滤器
private Evaluable evaluable;
public void init(Long id) {
try {
// 使用 janino 初始化规则
Class<Evaluable> clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition);
this.evaluable = clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean eval(ClientLogSource clientLogSource) {
return this.evaluable.eval(clientLogSource);
}
}
重点在于 Evaluable 接口,动态生成代码就是继承了这个接口,用于执行过滤规则的基础接口。
代码动态生成下面会详细介绍。
代码语言:javascript复制public interface Evaluable {
// 动态规则接口过滤方法
boolean eval(ClientLogSource clientLogSource);
}
5.2.2.2.基于 zk 的动态配置中心
使用了 zk 作为动态配置中心,来动态监听规则配置以及更新规则池。
代码语言:javascript复制public class ZkBasedConfigCenter {
// zk config 变化监听器
private TreeCache treeCache;
// zk 客户端
private CuratorFramework zkClient;
private ZkBasedConfigCenter() {
try {
open();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// !!!规则池!!!规则池!!!规则池
private ConcurrentMap<Long, DynamicProducerRule> map = new ConcurrentHashMap<>();
private void open() throws Exception {
// 初始化规则
// 初始化 zk config 监听器
// 当有配置变更时
// 调用 private void update(String json) 更新规则
}
public void close() {
this.treeCache.close();
this.zkClient.close();
}
private void update(String json) {
Map<Long, DynamicProducerRule>
result = getNewMap(json);
// 1.将新增规则添加进规则池
// 2.将下线规则从规则池删除
}
private Map<Long, DynamicProducerRule> getNewMap(String json) {
// 将新规则解析,并使用 janino 进行初始化
}
}
可以使用一个固定路径的配置,如图博主使用的是 /kafka-config 这个路径
7
5.2.3.动态规则引擎
目前字节使用的引擎是 Groovy,但是博主常用 flink sql,sql 中的代码生成是使用 janino 做的,因此就比较了 janino 和 groovy 的性能差异,janino 编译出的原生 class 性能接近原生 class,是 Groovy 的 4 倍左右。其他的引擎不考虑,要么易用性差,要么性能差。
★Notes:性能这一点真的是很重要,1:4 的差距可以说是差别很大了。如果你的场景也是大流量,非常耗费性能的场景,建议直接入手 janino!!! ”
来看看具体的 benchmark case 代码:
代码语言:javascript复制// ClientLogSource 是原始日志
boolean eval(flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) {
return String.valueOf(clientLogSource.getId()).equals("1");
}
上面这段代码,在博主 mac 本地执行,每次循环执行 5kw 次,总计执行 5 次 得出的结果如下:
代码语言:javascript复制java:847 ms
janino:745 ms
groovy:4110 ms
java:1097 ms
janino:1170 ms
groovy:4052 ms
java:916 ms
janino:1117 ms
groovy:4311 ms
java:915 ms
janino:1112 ms
groovy:4382 ms
java:921 ms
janino:1104 ms
groovy:4321 ms
重复执行了很多次:java object : janino 编译原生 class :groovy :几乎都是 1:1:4 的耗时。所以此处我们选择性能更好的 janino。
代码语言:javascript复制public class JaninoUtils {
public static Class<Evaluable> genCodeAndGetClazz(Long id, String topic, String condition) throws Exception {
// 动态生成代码
// 初始化 Class<Evaluable> 并返回
}
}
5.2.4.动态上下线 Kafka topic
来看入口类中的第二个核心点,就是 KafkaProducerCenter。其功能包含:
- 维护所有的 producer 池
- 提供消息发送接口
项目代码、在博主公众号回复【揭秘字节跳动埋点数据实时动态处理引擎】获取:
代码语言:javascript复制public class KafkaProducerCenter {
// kafka producer 池
private final ConcurrentMap<String, Producer<String, String>> producerConcurrentMap
= new ConcurrentHashMap<>();
private Producer<String, String> getProducer(String topicName) {
// 如果 kafka producer 池中有当前 topic 的 producer,则直接返回
// 如果没有,则初始化一个新的 producer 然后返回
}
public void send(String topicName, String message) {
final ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
"", message);
try {
RecordMetadata metadata = getProducer(topicName).send(record).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
// 关闭所有 producer 连接
}
}
上面就是所有的代码、逻辑实现方案。其实整体看下来是非常简单的。
6. 数据保障篇-框架的保障方案
- 配置中心挂了怎么办?
为这个任务分配独立的队列资源,每当这个任务加载到最新配置时,都将配置在本地存储一份。当配置中心挂了的时候,还可以直接加载机器本地的配置,不至于什么都产出不了。
- 怎么保障用户的配置是无误的?
- 上线前审批:有专门的埋点管理人员进行逻辑验证及管理
- 上线前自动化测试:在埋点管理平台自动化验证逻辑正确性,保障上线到 flink 任务里的配置都是正确的
- AOP 异常处理、报警:在环境中做 AOP 异常处理,将异常数据 dump 到专用异常 topic 中,也需要自动化把报警信息透出
- 结果验证:针对最终的结果需要有数据准确性验证机制
7. 总结与展望篇
7.1.总结
本文主要揭秘、实现了字节跳动埋点数据实时动态处理引擎。
7.2.展望
- 本文主要实现了拆流的动态化,输出数据和输入数据完全相同,但是很多情况下,下游只需要其中的一些字段。因此之后还可以做到对于 sink message 字段、消息的个性化。比如可以加一个动态化的 Map 逻辑,将数据源中的 ClientLogSource 转化为任何用户想要的 Model。比如使用 Dynamic Message 或者使用代码生成去做。
- 目前过滤条件完全是 java 语法,之后可以扩展成为 sql 语法,提高可读性
- 函数、rpc 热加载