上一篇《Flink接收端反压机制》说到因为Flink每个Task的接收端和发送端是共享一个bufferPool的,形成了天然的反压机制,当Task接收数据的时候,接收端会根据积压的数据量以及可用的buffer数量(可用的memorySegment数)来决定是否向上游发送Credit(简而言之就是当我还有空间的时候,我向上游也就是上一个Task的发送端发送一个ack消息,表明我还有空间你可以发送数据过来,如果下游没有给你Credit就证明下游已经堵了,没有空间了也就不能继续往下游发送了)
现在从源码来看一下Task的数据发送端,也就是Netty的Server端的实现
先看Task初始化的时候TaskManagerRunner.java中startTaskManager()方法中
这个connectionManager其实分为两种,Netty,local一看就知道netty这种肯定是对应需要通过网络传输,本地模式这里就不讲了。
这个地方看到Flink的client和server都初始化了,需要注意的是其实这个地方client端只是初始化了一些配置,并没有调用bind()方法启动起来,这里看过上一篇文章的同学就会知道,client只有当第一次需要拉取上游subpatition数据的时候才会启动起来也就是bind(),
而server端在这里也就是task启动的时候就启动起来了,继续看server端如何启动的server.init()方法。
init方法中,这里可以看到,这是Flink1.6以前只有基于netty的tcp网络层反压,这里是通过bootstrap的两个参数
ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK 大小为两倍的memorySegment大小
ChannelOption.WRITE_BUFFER_LOW_WATER_MARK 大小为memorySegment 1
接着
1处2处常规的Netty定长编解码器,没有什么好说的
看看3处,4这里先不讲后面会提到
看到3是一个inboundHandler,反压机制时他的用处是用来接收来自下游响应的Credit,来看他的ChannelRead0方法
当接收到的消息是一个Credit信任的时候
先是
增加了这个reader的可用的Credit可用数
然后
其实了解了接收端的反压,发送端接收到了下游的credit,那发送数据的时候肯定有一个地方会先判断是否有可用的Credit才决定是否往下发数据
其实就是这个带星号的地方判断,然后下面就是常规的从queue中拉取reader往netty下游writeAndFlash()数据了,没什么好讲的
来看一下他判断Credit是否满足的地方
可以看到只有当
有数据且可用的Credit数量大于0
或者有数据且数据是一个事件而不是record的时候,才返回true往下游发送
可以看到这个 enqueueAvailableReader()方法比较重要,里面包含了判断credit以及往后下游发送数据的逻辑
那这个enqueueAvailableReader()方法除了会在接收到下游的Credit的时候触发一次,还有哪会被触发呢
既然是往下游发送数据那我们task处理完数据以后应该也会调用这个方法
于是来看一下Task发送数据,以前的文章讲过,这里就不赘述了,直接看到RecordWriterOutput的emit()
会先将record写入到这个Serializer里面去
然后copyFromSerializerToTargetChannel()方法中
先去localBufferPool中请求buffer,这里就是反压了
请求到buffer了以后
这个调用链有点长不全列出来了
最后
这个requestQueue其实是前面Netty初始化时具体逻辑中的4,是一个ChannelInboundHandlerAdapter
这个Inbound一开始我也很疑惑,这个Inbound没有重写他的channelRead()方法,那这个不就直接转发数据了吗,那他的作用是干嘛的呢
继续往下看
原来发送数据的时候会触发这个inbound的eventTrigger
看下userEventTriggered()具体触发了什么
这个方法就很眼熟了,就是前面到接收到下游发送过来的Credit时会触发一次的方法,用来判断是否有Credit以及通过netty往下游发送数据
这里在发送数据的时候果然又触发了,后面就是判断是否有Credit满足往下游发送数据的条件,然后往下游发送数据
也就是说:
当接收到下游返回的Credit的时候会触发一次,是否能往下游写数据的判断并拉queue数据写数据。
每次Task处理完数据以后emit,也会触发一次判断并拉queue数据写数据。