一、实时开发常见问题
1、一个实时计算任务该分配多少资源?
建议:一些简单ETL任务,并且源数据流量在一定范围内, tm个数1、全局并行度1、内存1G。
分析:
- 全局并行度为1,对于简单ETL任务会有operator chain,在一个task(线程)中运行、减少线程切换、减少消息序列化/反序列化等,该类问题的瓶颈一般在下游写入端。
- 写入端是瓶颈:一般建议开启批量写入(需要控制批量大小,防止内存溢出)、开启多并行度写入。
- 如果是单台数据库的瓶颈:开启多个并行度就没法提升性能、一般建议按照一定路由规则写入多台数据库、建议使用分布式数据库(如Hbase:提前建立分区、避免数据热点写入等)。
2、为什么写入Kafka结果中有些分区没有数据?
建议:如果现有topic已经存在,并且是多个分区,结果表并行度设置partition数一样。 分析:
- 由于Flink写Kafka默认采用的是FixedPartitioner。如果并行度比partition大,则数据都会发送到partition中,但是如果并行度比partition小,则有部分分区是没有数据的。
- source端,如果并行度小于partition,会取模的方式分给并行度,都会消费到数据。如果并行度大于partition,则会有部分task消费不到数据。
3、为什么和维表关联后任务处理数据的能力变慢?
建议:小数据量不常更新的维表使用ALL模式。大数据量的维表使用使用LRU模式,并且根据数据库不同做相应的处理(比如关系型数据库则建立索引等)。
分析:1.ALL模式启动时候直接将数据全量加载到内存中,每次关联数据不需要查库,没有其他开销。2.异步(async)查询模式
- LRU异步查询数据库,可以并发地处理多个请求。
- 根据SQL中的关联字段顺序建立复合索引。
- 防止关联字段索引失效(关联顺序不对、关联列做计算等)。
- 如果维表字段个数少,考虑将将多余字段都加入到索引中,减少回表(带来的问题是索引变大)。
4、为什么某些任务提高并行度能提升性能,某些不能?
建议:查看是否数据倾斜,如果是将数据打散。
分析:
- 源头是否数据倾斜。
- SQL中是否存在导致倾斜的语句。
- 登陆到Flink web页面查看。
- 通过修改SQL解决或者打散groupby字段。
二、实时任务运维
1、配置反压告警
场景:反压导致cp失败,数据出现延迟或者不产出。
排查方法: 1)借助Flink web-ui 提供的的反压功能查找具体的operatorChain。 2)查询Flink metric 'inPoolUsage、outPoolUsage' 来确定具体的反压算子。
2、配置cp失败告警
场景:cp失败导致数据无法真正落地,任务恢复间隔太长。
排查方法:
1)是否存在反压。 2)检查集群负载、IO、CPU、MEM 是否处于高负荷状态。
3、拆分实时任务日志
场景: Flink实时任务运行时间长之后导致日志占用磁盘大,另外一个大的日志文件不利于排查问题。
解决方法:
配置log4j.log的滚动参数,设置日志按日期或者大小滚动生产,并且限制保留的大小。
4、监控任务运行中tm日志
场景: 任务执行中产生的运行日志没有监控,比如网络抖动导致的链接失败等等。
解决方法:
修改Flink自带的log4j jar包中的代码,将异常日志重定向一份到Kafka或ES中,进行后续分析,找到程序中可能存在的隐藏bug。
5、脏数据管理
场景:由于数据源都是从Kafka过来的数据,可能存在数据类型错误、字段名称错误、字段阈值在Flink中超范围等。落库过程中,由于字段类型不匹配、阈值超范围等等情况。
解决方法:
在数据解析和数据落库等代码中,对catch中的数据进行收集。当异常数据达到一定的量时,告警通知。线下离线修正结果数据。
三、通过Metrics定位问题
1.常用内置Metrics介绍
端到端的延时(最大、平均、百分位):
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency
输入数据量:
flink_taskmanager_job_task_operator_numRecordsIn
flink_taskmanager_job_task_numBytesIn
输出数据量:
flink_taskmanager_job_task_operator_numRecordsOut
flink_taskmanager_job_task_numBytesOut
反压值:
flink_taskmanager_job_task_isBackPressured
任务buffer:
inPoolUsage、outPoolUsage等其他
2、flinkStreamSql中常用metrics
业务延迟:
flink_taskmanager_job_task_operator_dtEventDelay(单位s)
数据本身的时间和进入flink的当前时间的差值。
各个输入源的脏数据:
flink_taskmanager_job_task_operator_dtDirtyData
从Kafka获取的数据解析失败视为脏数据。
各Source的数据输入TPS:
flink_taskmanager_job_task_operator_dtNumRecordsInRate
Kafka接受的记录数(未解析前)/s。
各Source的数据输入RPS:
flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate
Kafka接受的记录数(未解析前)/s。
各Source的数据输入BPS:
flink_taskmanager_job_task_operator_dtNumBytestInRate
Kafka接受的字节数/s。
Kafka作为输入源的各个分区的延迟数:
flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag
当前Kafka10、Kafka11有采集该指标。
各个输入源RPS:
fink_taskmanager_job_task_operator_dtNumRecordsOutRate
写入的外部记录数/s。
四、FlinkStreamSQL v1.11.1介绍
1.DDL建表语句和FlinkStreamSql v1.10之前版本保持一致。
2.DML语句有两种不同的模式:
dtstack模式:和之前的版本是一致的。
Flink模式:和Flink原生的语法保持一致。
3.主要区别点:和维表join方式不同。
4.如何使用:在提交任务的时候加上 -planner dtstack/flink即可。
本文作者:刘星(花名:吹雪),袋鼠云大数据开发工程师。
本文首发于:数栈研习社
我们在github上有一个flinkx的开源项目,欢迎大家来讨论交流~