【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失
【Flink】第五篇:checkpoint【1】
【Flink】第五篇:checkpoint【2】
【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查
【Flink】第八篇:Flink 内存管理
【Flink】第九篇:Flink SQL 性能优化实战
【Flink】第十篇:join 之 regular join
【Flink】第十三篇:JVM思维导图
【Flink】第十四篇:LSM-Tree一般性总结
【Flink】第十五篇:Redis Connector 数据保序思考
【Flink】第十六篇:源码角度分析 sink 端的数据一致性
昨天,分析修复了一个connector的问题。下面开始陈述整个过程,依旧按照之前的陈述思路进行:
- 问题表象
- 分析,得出阶段性结论反馈下一步验证
- 验证,得出阶段性表象反馈下一步分析
- 第二步和第三步循环迭代,得出最终结论
- 改进措施
问题表象
1. 业务场景
将业务库的从库上一个高表用CDC方式接入,写Flink SQL的方式,转换成宽表写入数仓的kudu表。在和凯哥讨论对比了高转宽表的传统方案和新方案后,我们决定用多insert into的高转宽方案。这种方案可扩展性强且无需groupagg消耗大状态。
2. 异常信息
代码语言:javascript复制java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.SocketSendBufferPool$Preallocation.<init>(SocketSendBufferPool.java:156)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.SocketSendBufferPool.<init>(SocketSendBufferPool.java:42)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:45)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.newWorker(NioWorkerPool.java:44)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.newWorker(NioWorkerPool.java:28)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:80)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33)
at org.apache.kudu.client.AsyncKuduClient$AsyncKuduClientBuilder.createChannelFactory(AsyncKuduClient.java:2740)
at org.apache.kudu.client.AsyncKuduClient$AsyncKuduClientBuilder.access$000(AsyncKuduClient.java:2589)
at org.apache.kudu.client.AsyncKuduClient.<init>(AsyncKuduClient.java:367)
at org.apache.kudu.client.AsyncKuduClient.<init>(AsyncKuduClient.java:261)
at org.apache.kudu.client.AsyncKuduClient$AsyncKuduClientBuilder.build(AsyncKuduClient.java:2762)
at org.apache.kudu.client.KuduClient$KuduClientBuilder.build(KuduClient.java:543)
at org.haiercash.flink.kudu.connector.internal.writer.KuduWriter.obtainClient(KuduWriter.java:76)
at org.haiercash.flink.kudu.connector.internal.writer.KuduWriter.<init>(KuduWriter.java:66)
at org.haiercash.flink.kudu.connector.table.sink.KuduSink.open(KuduSink.java:87)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
分析及验证迭代
从异常堆栈信息初步分析:
- kudu connector报出的异常
- 异常给出的直接原因是task off-heap memory不足
- kudu connector调用了kudu Client
- kudu Client 调用了 netty 进行网络间的数据IO
- 异常直接现场应该是netty
- 这个task off-heap应该是netty 申请的JVM direct memory 用于网络IO的
那么按照异常给出的直接原因进行验证,将task off-heap memory调大,
代码语言:javascript复制--如果运行模式为:flink run的yarn cluster,可以通过加以下参数
-yD taskmanager.memory.task.off-heap.size=512m
调大后作业依然运行1分钟左右失败,但是异常信息会出现yarn containner的heartbeat time out,这种异常一般是由于已经发生了一些程序异常导致containner的心跳发送/回应不及时导致,所以继续抓那个前置的异常。得到如下这个异常
代码语言:javascript复制java.lang.OutOfMemoryError:unable to create new native thread.
分析这个异常的两个可能原因:
- 本地内存不足,难以创建新线程
- 超过了linux限制
我们知道java线程是操作系统级别的线程,而不是只存在于用户态的假线程,它和操作系统的内核线程是1 : 1的关系。而创建内核线程占用的native内存不足,是否是因为扩展了task off-heap memory,将overhead memory压小了?
带着这个猜想验证flink UI中的overhead memory大小并没有改变,而压缩变小的是JVM中的task heap memory。
这里同时也得出一个结论:适当调大task off-heap memory,而与此平衡的是task heap memory。
我们再用异常提供的直接原因,给出解决方案,将线程的栈内存大小调小,在进行验证
代码语言:javascript复制--同样在启动命令中加入参数
-yD env.java.opts="-Xss256k"
降低线程栈内存大小重启作业程序后发现程序运行到失败的时间变长了,但仍然报之前的异常,
代码语言:javascript复制java.lang.OutOfMemoryError:unable to create new native thread.
这个异常剩下的一种可能就是常见的线程数太多,所以我们需要统计这个TaskManager启动的线程数,方法如下,
找到TaskManager进程ID,方法有两种,
- 先通过flink UI找到TM的所在节点及监听的端口号,用netstat命令找到PID
- 先记下YARN上这个作业的application id,再通过jps -lvm | grep appid找到这个作业所在节点上的PID
再用ps -T命令统计PID所有的线程数,
代码语言:javascript复制ps -T | wc -l
发现作业一个TM的线程数就有9000!而且程序还在启动中,并没有创建完毕。于是再观察其他一般的小作业的TM进程的线程数,将近100多。
虽然,这个高转宽的作业包含了大量的insert into sql(100个左右)被塞到一个statement里去了。但是运行参数也只用了1个并行度,不至于产生这么多线程的啊,按照flink的框架,这100个sql产生的执行计划应该是被分配到这1个并行度subtask线程里进行执行才对。
所以,不能认为 100个sql * 100线程 约等于 9000个线程这种结论,就进一步认为100个sql起这么多线程是合理的!(其实虽然一个正常TM会启那么多线程,很多都是TM的服务线程,并不是执行线程,即subtask)
接下来,我开始对这些线程都是什么线程产生了好奇,于是在Flink UI中,用Thread dump将这个作业的线程堆栈拷贝了出来进行分析。发现居然大多数都是一种线程:
代码语言:javascript复制"New I/O worker #1680" Id=1791 RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked sun.nio.ch.Util$3@3e207647
- locked java.util.Collections$UnmodifiableSet@103d14b1
- locked sun.nio.ch.EPollSelectorImpl@5f0207f5
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
...
Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker@7c29e509
分析得出如下结论,
- 这是一个NIO线程
- 是由netty创建的
- 这个netty在应该是kudu封装了的netty
- 这个线程是一个线程池线程
其实到这里就基本有了结论,思考整个过程,清晰的逻辑链条展现在脑海中:
- 最开始的direct memory oom异常就是由于创建了太多的netty nio线程造成超出了flink作业启动配置的task off-heap内存。
- 调大task off-heap、调小线程栈内存后,由于创建的线程数太多,又超出了linux线程数限制
- 这个线程池应该是kudu Client封装的Netty用于socket 的 boss/worker的线程池
顺着这个思路分析Kudu connector源码,还是从SPI文件开始找起直到kudu的SinkRuntimeProvider,
这里就是kudu connector和kudu client的边界,继续向下追溯,就到了kudu Client的建造者,
继续往里找,在下面这两个方法我们找到了解决问题的关键,这两个参数就是用kudu Client创建netty socket nio服务的线程数控制方法,boss线程默认是1,而worker线程数默认是 2 * the number of available processors,这在线上服务器运行是相当恐怖的服务器的available processors一般都能达到几十甚至几百,又经过几百个insert into的sql的addsink(kudusink)这种底层flink sql的执行计划的放大效果,如果服务器没控制到进程线程数量,后果将很严重!
修改措施
为了进一步验证我的整个过程的猜想的正确性,在IDEA中进行了本地的源码调试,结果如下,
看一看到这里的线程池中的线程的name即为Thread dump中的频繁创建的线程:New I/O worker #1680
代码语言:javascript复制"New I/O worker #1680" Id=1791 RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked sun.nio.ch.Util$3@3e207647
- locked java.util.Collections$UnmodifiableSet@103d14b1
- locked sun.nio.ch.EPollSelectorImpl@5f0207f5
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.kudu.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
...
Number of locked synchronizers = 1
- java.util.concurrent.ThreadPoolExecutor$Worker@7c29e509
文中需要一些前置知识,
1. Flink Connector 保序和数据一致性请参考:
- 【Flink】第十五篇:Redis Connector 数据保序思考
- 【Flink】第十六篇:源码角度分析 sink 端的数据一致性
2. Flink 内存模型请参考:
- 【Flink】第八篇:Flink 内存管理
3. JVM请参考:
- 【Flink】第十三篇:JVM思维导图