2019年的主要工作就是围绕Flink来做一些事情,分为以下几个方面:
一、实时流平台
二、实时监控
三、实时数仓
四、实时业务开发
接下来详细说一下在这几个方面做的一些事情以及如何解决遇到的一些问题与将要做的事情。
一、实时流平台
首先看一下目前关于Flink使用的整体架构图,任务执行模式选择per-job on yarn,方便于经常会对集群资源动态调整的需求,每个任务的日志也可以单独剥离开来方便排查问题。经过Flink计算的结果数据会输出到外部存储,对于业务上的实时计算会输出到MySql/HBase , 然后由上层的统一数据服务查询接口查询数据用于可视化平台数据展示;对于一些监控类的数据会输出到Influxdb中,然后由Grafana做数据可视化与告警;另外还有一部分输出到HDFS,然后通过Hive/Spark做一些小时级别的数据分析。对于输出的业务数据也会做一些数据质量监控,以便及时发现不符合规范的数据。
对于我们来说重点放在Flink计算框架,为此打造集任务开发、管理、监控、集群管理的实时流平台,架构图如下:
对于整个平台目标是致力于让不懂实时计算的业务开发人员通过SQL方式完成自己的实时业务开发,实现业务数据实时化,为此重点放在SQL化的编程方式,提供了源表、结果表DDL、维表关联,同时也抽象出一些常见的UDF提供使用,对于一些无法通过SQL完成的业务也提供jar模式提交任务,可通过编写DataStream/Table层API打包提交任务执行。在整个任务开发过程中,发现对于一些使用的外部数据源Kafka/MySql/Hbase等很难管理,如果发生的源的变更,排查起来很费劲,因此将所有的外部数据源统一管理起来,对外只提供一个数据源ID,那么就可以通过数据源ID获取需要的数据源信息。
对于任务指标采集上,最开始通过调用提供的rest api,定时轮询的方式获取然后通过平台来提供可视化展示,但是随着后期任务的增多,会导致轮询方式造成一定延时,需要采集的指标变多平台也需要进行相应的调整,选择report方式,将指标输出到influxdb中,就是用了InfluxdbReport,但是我们使用的是yarn per-job模式,导致采集的jobManager/taskManager指标没有job标识,所以就改写了InfluxdbReport源码,从任务级别的指标中获取applicationId, 然后在定时report中加上applicationId。
为了方便提供用户日志排查,通过自定义log4j的Appender方式将日志信息写入到kafka, 然后通过logstash收集到ES,在ES中通过applicationId 查询任务相关日志。与此同时保留了写文件的日志,但是经常会有一些在udf或者代码里面打印处理的明细数据,导致磁盘被打爆,所以就做了一些规范,用户日志只能使用指定的logger 名称,并且定义一个filter在file logger中将其过滤,使其只能输出到kafka中。
由于我们是多区、多集群的场景,所以在框架部署升级或者任务部署会比较麻烦,在平台上做了多集群任务自动同步,就不需要在每一个集群上进行重复的操作,也可以避免代码的不一致性,对于框架升级提供了集群配置功能与框架包上传功能,通过平台来完成自动化部署。
二、 实时监控
这里所说的实时监控,主要针对一些实时链路的监控,例如API调用请求数、成功率、耗时等,并不是针对业务的一些监控,初期的架构如下:
这种做法应该是常见的日志链路方式,将应用的打点日志数据收集到kafka中,然后由Flink程序去处理,写入到influxdb之后由grafana展示与报警。这种方式链路长、耗时、排查问题困难,所以就有了另外一种方式, 架构图如下:
提供客户端SDK,封装一些常见的metric, 例如:求和、求平均等,客户端只需要调用相应的api, 然后由SDK异步的将指标发送到中间层,在中间层会做一次预聚合,一方面将这些指标数据发送到kafka,一方面将指标对应的一些应用信息、指标数写入到influxdb,通过grafana展示应用指标情况。在metric发送到kafka之后,由通用的Flink程序处理,将指标数据输出到influxdb中。这种方式对用户来说只需要接入SDK即可,下游的处理都是通用处理方式,对于我们来说也不需要做二次开发,缩短整个周期,同时也节约的成本。
三、实时数仓
由于Flink本身提供了SQL化的编程接口,所以在19年看到Flink很多的一个应用场景就是实时数仓,我们在根据业务需求也在做实时数仓的尝试,目前实时数仓架构如下图:
在实时数仓构建过程中主要是通过SQL UDF方式完成,数据源主要是binlog与终端日志,然后由Flink程序完成清洗,将数据源转换为json格式,发送到ODS层kafka中;DIM层数据来源于两部分:一部分是实时Flink程序处理ODS层得到,另外一部分是通过离线任务得到。
目前在根据具体业务构建中主要聚焦以下几个问题:
1. 实时去重, 为此做了SQL化的去重方案,hyperloglog模糊去重与bitmap精确去重,在之前的Flink Forward 中有提到使用FirstValue来做去重,但是目前使用的是1.8版本还不提供这个函数,因此也在1.8中实现了FirstValue函数来做精确去重;
2. 撤回流的使用,目前很多场景都需要撤回,例如在统计产品对应设备数中,但是设备所属的产品有可能会发生变更,这个时候就需要对之前的统计结果进行撤回,好在Flink SQL本身是支持撤回功能,因此对这方便也做了一些研究。一个比较典型的就是kafka 提供的tableSink 是append类型的,因此自己实现了可接受retract 流的kafkaTableSink;
3. 流与流之间join,流与流的join 最大的问题就是跨窗口问题,会导致晚到的数据无法被关联上,而做全局join 又会带来state存储问题,因此在使用过程中尽可能的将流与流之间的join,转换为流与维表之间的join。另外一种方式就是在做全局join时,通过StreamQueryConfig 来设置一个尽可能较大一点的ttl 来做数据的定时清理;
四、实时业务开发
实时业务开发主要是做一些SQL无法满足的场景,例如需要做延时数据处理,主要聊一下在业务开发中几个聚焦的点:
1. 延时数据处理,在使用事件时间的语义窗口处理中,避免不了延时数据的处理,可以使用sideoutput 侧流输出来做延时处理;
2. Exactly-Once语义的保证,Flink 本身是支持输出到Kafka/HDFS的Exactly-Once语义的保证,但是我们更多使用的输出终端是MySQL/HBase 等,因此针对不同的场景实现了保证语义的不同方案:
a. 幂等性, 例如窗口输出是具有唯一性,因此在设计时只需要做写入覆盖即可
b. 事务性, 依据Flink的两阶段提交实现了写入MySql的事务机制保证
c. 最终一致性,借助于Flink本身内部是能够保证Exactly-Once的,将所有的结果保存在状态中,只需要向外部输出状态中结果数据即可
3. 定时定量输出,定时定量输出主要是为了减小对外部写入的压力,定量将中间结果数据存在缓存中,然后使用状态做容错机制,定时借助于Flink中定时机制来完成;
4. 事件时间倾斜,由于在业务逻辑处理中会按照特定的业务字段进行分组处理也就是keyBy操作,但是出现了某个task长时间没有数据产生,那么在下游处理中时间一直没法推进,也就导致无法触发相应的操作,因此在实现过程中实现了既能按照事件时间触发又能按照处理时间触发。
5. 数据顺序的保证,某些业务上处理前后是有逻辑关联的,会要求上游在发送数据时将业务关联数据发送到kafka的topic 同一个partition中;
五、将要做的事情
实时流平台完善,主要分为以下几点:
1. 提供数据校验功能,也就是要让业务认可我们计算的结果数据是正确的
2. 提供SQL校验功能,目前只有通过任务提交了才能知道SQL是否正确,希望能够在开发过程中进行SQL校验
3. 平台支持测试功能, 提供测试入口与结果数据输出功能
同时也会做OLAP的技术选型与落地使用,另外还会做更多的场景覆盖,例如CEP的落地使用等。
—END—