Flink 实践教程:进阶7-基础运维

2022-01-20 21:07:17 浏览数 (2)

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文首先介绍了几种 Flink 应用最常见、最基础的错误,用户在使用的时候可以尽量规避的问题。接下来介绍了流计算 Oceanus 平台的监控系统,可以帮助用户实时了解作业各个层级的明细及运行状态。然后借助于日志系统帮助诊断作业出现的错误及原因。

视频内容

基本错误

缺少 DML 语句

一个完整的 Oceanus SQL 作业(JAR 作业里使用 SQL 语句)主要包含以下几个模块:Source(DDL)、Sink(DDL)和 INSERT(DML)语句。在实际应用中经常碰见客户在创建完 Source 和 Sink 后不写 INSERT 语句,导致在【语法检查】时报如下错误。

代码语言:javascript复制
语法检查失败:java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.

数据类型映射

数据类型映射错误也是一个比较常见的错误。流计算 Oceanus 官网也对各种数据库字段类型与 Flink 字段类型的对应关系做了陈列,具体可参考 开发指南概述 [1] 里面的【上下游开发指南】,本文就不再重复。

连接超时/失败

上下游地址、库表是每个 DDL 语句的配置参数必填项。在【语法检查】时,平台并不会检查 DDL 配置参数的正确性,这些检查通常在程序运行时检查。下列关键字代表外部系统访问(例如 MySQL、Kafka 等)可能因为网络原因出现了超时。结果中可能会有很多配置相关的内容,请自行甄别是否是报错。

  • Kafka 的 Timeout expired while fetching topic metadata 表示初始化超时;
  • MySQL 的 Communications link failure 表示连接中断(可能是很长时间没有数据流入,造成客户端超时)。

下面例子为 Kafka 作为 Source,MySQL 作为 Sink 的一个连接错误日志演示:

代码语言:javascript复制
// example: kafka source 内网地址填写错误导致报错
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=15, backoffTimeMS=10000)  
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
// example: kafka topic 不存在报错
java.lang.RuntimeException: Could not fetch partitions for oceanus_advanced10_input. Make sure that the topic exists.
代码语言:javascript复制
// example: JDBC(MySQL) Sink 连接地址填写错误导致报错
java.io.IOException: unable to open JDBC writer  
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
// example: JDBC(MySQL) Sink 端用户无写入权限或密码填写错误
Caused by: java.io.IOException: unable to open JDBC writer  
Caused by: java.sql.SQLException: Access denied for user 'joylyu'@'10.0.0.101' (using password: YES)

主键问题

有 update/delete 数据(回撤流数据)的情况下,Sink 端必须定义主键,这种情况无主键定义或者主键定义错误也是语法检查失败常见的错误。Sink 端需定义主键的情况如下:

  • Source 使用 Flink CDC 系列(这种情况下 Source 也需要定义主键)。
  • 数据转换过程中有 update/delete 数据存在。
代码语言:javascript复制
// example:数据转换过程中去重,Sink 端无主键定义报错
语法检查失败:java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

窗口函数聚合问题

Flink 1.13 已经支持 Windowing TVF 函数,这种函数目前需要单独配合聚合函数使用,单独使用的场景暂时还不支持(社区后面会做优化支持)。Windowing TVF 函数相比于之前的窗口函数的写法(例如 GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE))更符合 SQL 写法规范,底层优化逻辑也更好,在使用窗口函数的时候推荐大家优先使用 Windowing TVF 函数。需要注意的是,窗口函数并不能处理更新(update)和删除(delete)数据,当有这类数据进入窗口函数时,报错如下:

代码语言:javascript复制
语法检查失败:org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[order_id], order=[ROWTIME])

下面是几种常见的错误用法:

  1. Source Connector 使用 CDC 系列,之后使用窗口聚合分析。
  2. 先去重,后使用窗口聚合函数,例如下面例子。
代码语言:sql复制
-- 以下为错误演示
-- 将原始数据去重
CREATE VIEW kafka_json_source_view AS
SELECT order_id, num, event_time
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY event_time ASC) AS row_num
  FROM kafka_json_source_table)
WHERE row_num = 1;
-- 之后进行开窗后插入
INSERT INTO jdbc_upsert_sink_table
SELECT
window_start,window_end,SUM(num) AS num
FROM TABLE(
    -- 下面这句 SQL 就是 windowing TVF 的使用方法
    TUMBLE(TABLE kafka_json_source_view,DESCRIPTOR(event_time),INTERVAL '1' MINUTES)
) GROUP BY window_start,window_end;

JAR 包过大

在 Oceanus 控制台,【依赖管理】里面新建依赖上传 JAR 包,JAR 包大小限制为 150M。在实际使用中经常碰见用户打得 JAR 包过大,超过 150M 而不允许上传的情况。实际上 Oceanus 平台已经内置了 Flink 相关的 JAR 包,用户在打包时不用将这些 JAR 打进去,只需要在 POM 里面 scope设置为provided 即可,例如:

代码语言:html复制
<!--example-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.13.2</version>
    <scope>provided</scope>
</dependency>

找不到主类

JAR 包打好后放到 Oceanus 平台运行,首先需要指定运行的【主程序包】及相对应的版本(即为用户上传的业务代码包),并选择【主类】。【作业参数】>【内置 Connector】选择对应的 Connector,如有业务需要也可选择【引用程序包】。在正式运行之前请检查:

  • 类名是否有拼写错误
  • 确定是否将相关的业务代码依赖打进 JAR 包中

基础运维

作业监控

流计算 Oceanus 提供强大的作业监控能力,我们可以通过【监控】项查看作业的各项指标,包括每秒数据流入条数、每秒数据流出条数、算计计算总耗时、目的端 Watermark 延时、作业重启次数,甚至更细化到 CheckPoint、JobManager、TaskManager、Task 内的各项细化指标,具体可以查看 Oceanus 官网文档 查看作业监控信息 [2]。 当然在控制台的作业列表界面,单击右上角的【云监控】,即可进入 云监控控制台 [3],查看更为详细的监控指标。在此还可以配置作业专属的 监控告警策略 [4]。云监控监控指标主要包括七大维度,分别为:

  • 作业运行信息类
  • JobManager 运行信息类
  • TaskManager 运行信息类
  • JobManager GC 类
  • TaskManager GC 类
  • 作业 Checkpoint 信息类
  • ETL 运行信息类

下图为作业运行信息类示例

云监控云监控

此外,流计算 Oceanus 还支持将 Flink 指标上报到 Prometheus,用户可以自行保存、分析和展示作业的各项指标。具体查看 接入 Prometheus 自定义监控 [5]。流计算 Oceanus 平台的监控系统极大的方便用户实时查看作业的运行情况。

作业日志

在不同业务场景下可能出现不同的错误,常见的例如作业失败、OOM、JVM 退出等,具体可以参见 Oceanus 官网 日志诊断指南 [6]。除了这些常见异常,文档也对其他的报错信息进行了指导分析,如快照失败(超时)、超时/失败(上章节已说明)、异常、WARN 和 ERROR 日志、可忽略的报错信息。

  • 作业失败:通过 from RUNNING to FAILED 关键字可以搜索到作业崩溃的直接原因,异常栈中的 Caused by 后即为故障信息。
  • 是否发生过 OOM:如果出现了 java.lang.OutOfMemoryError 关键字,说明很可能出现了 OOM 堆内存溢出。需尝试增加作业的算子并行度(CU)数和优化内存占用,避免内存泄露。
  • JVM 退出等致命错误:进程退出码通常出现在 exit code/shutting down JVM/fatal/kill/killing 关键字后,可以辅助定位 JVM 或 Akka 等发生了致命错误被强制关闭等的错误。

总结

本文首先对出现的最基础的、用户可以自己解决的常见报错做了一些总结,这些错误常常出现在作业启动之前,所以在作业正式启动之前,用户需要自己检查好这些类型的错误,保证作业能够顺利的启动。之后介绍了下作业启动之后的一些基础运维手段,包括实时监控和告警通知,方便用户及时了解作业的具体运行情况。最后介绍了在作业失败之后通过日志关键字定位问题的方法,具体可以查看 日志诊断指南[6]。

基本错误:

基本错误类型

解决办法

缺少 DML 语句

增加 INSERT(DML)语句,INSERT 语句随 SELECT 语句一起使用

数据类型映射

参照 Oceanus 官网文档,注意上下游生态产品数据类型与 Flink 类型的映射

连接超时/失败

正确填写上下游生态产品的连接参数

主键问题

注意主键的正确使用方式,Upsert 类型数据需定义主键

窗口函数聚合问题

配合聚合操作正确、优先使用 Windowing TVF 功能(Flink >= 1.13)

JAR 包过大

POM 里面将 scope 设置为 provided

找不到主类

1、检查 JAR 包主类名是否填写错误。2、检查是否将主类打包进去

常见异常关键字:

常见运行错误类型

关键字

作业失败原因

通过 from RUNNING to FAILED 关键字搜索,Caused by 后即为失败原因

是否发生过 OOM

如果出现了 java.lang.OutOfMemoryError 关键字,说明很可能出现了 OOM 堆内存溢出。需尝试增加作业的算子并行度(CU)数和优化内存占用,避免内存泄露

JVM 退出等致命错误

进程退出码通常出现在以下关键字后,可以辅助定位 JVM 或 Akka 等发生了致命错误被强制关闭等的错误:exit code OR shutting down JVM OR fatal OR kill OR killing

快照失败(超时)

如果出现了下列该关键字,说明快照失败,请根据原因进行进一步的分析。例如,declined 表示由于资源未到位(作业并未处于运行中)、个别算子已进入 FINISHED 状态、快照超时、快照文件不完整等原因,造成了快照的失败:Checkpoint was declined/Checkpoint was canceled/Checkpoint expired/job has failed/Task has failed/Failure to finalize

超时/失败

下列关键字代表外部系统访问(例如 MySQL、Kafka 等)可能因为网络原因出现了超时。结果中可能会有很多配置相关的内容,请自行甄别是否是报错:java.util.concurrent.TimeoutException/timeout/failure/timed out/failed

异常

搜索 Exception 关键字,可以从各级异常栈的 Caused by 中看到具体的异常

WARN 和 ERROR 日志

一般情况下也可以搜索所有含 WARN 和 ERROR 关键字的日志,可能有较多结果,请注意筛选过滤有价值的信息

可忽略的报错信息

参见 日志诊断指南 [6]

参考链接

[1] 开发指南概述:https://cloud.tencent.com/document/product/849/48242

[2] 查看作业监控信息:https://cloud.tencent.com/document/product/849/48294

[3] 云监控控制台:https://console.cloud.tencent.com/monitor/product/oceanus

[4] 监控告警策略:https://cloud.tencent.com/document/product/849/48293

[5] 接入 Prometheus 自定义监控:https://cloud.tencent.com/document/product/849/55239

[6] 日志诊断指南:https://cloud.tencent.com/document/product/849/53959

0 人点赞