【Flink】第五篇:checkpoint【1】
【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查
【Flink】第八篇:Flink 内存管理
【Flink】第九篇:Flink SQL 性能优化实战
【Flink】第十篇:join 之 regular join
【Flink】第十三篇:JVM思维导图
【Flink】第十四篇:LSM-Tree一般性总结
近期,我们基于Flink SQL的实时数仓逐渐过渡到DWS层,接到一些对于最终计算指标需要入HBase和Redis等存储引擎以便于下游的各类数据服务可以获取。所以需要一个Redis Connector。
于是,基于Apache Bahir的Redis Connector我做了一些定制化开发,最终使得其支持Flink SQL将带有撤回语义的Upsert流入Redis。但是,我始终对于这个Redis Connector充满了担心,各种大大的问号充满了脑子,例如,exactly-once的ck机制如何保证,如何对数据进行保序(线程安全是否可以保证)?
接下来,重点讨论Sink端的Redis Cluster模式下的connector保序问题。
乱序场景
1. 场景描述:
如上图,客户端初始化了一个JedisCluster实例,这个实例指向了右侧的Redis Cluster。
这个Redis Cluster包含三个节点:node1、node2、node3。每个节点包含两个redis集群角色:一个master、一个slave。
2. Redis Cluster模式回顾
Redis Cluster的设计考虑到了去中心化、去中间件(如果用zookeeper这种中间件,网络I/O就会成为Redis的瓶颈了,毕竟网络和内存的访问效率差很多个数量级)。集群中的每个节点都是对等关系,每个节点都保存各自的数据和整个集群状态。每个节点都和其他所有节点连接,这样只需要连接集群中的任意一个节点,就可以获取到其他节点的数据。
Redis集群采用哈希槽(hash slot)的方式来分配。Redis Cluster默认分配16384(2的14次方)个slot,当我们set一个key时,会用CRC16算法来取模得到所属的slot,然后将这个key分到该哈希槽所在节点上:
key > CRC16(key)384 > slot > host
Redis集群会把数据存在一个master节点(读写),然后在这个master和其对应的slave之间进行数据同步(异步的,不保证强一致性,此乃内存数据库为了性能的必然选择)。当读取数据时,也根据哈希槽算法到对应的master(读写)/slave(只读)获取数据。
3. JedisCluster初始化集群slot映射关系
回到上面的场景来,16384个slot被分到了三个node上,那问题来了,Redis是何时直到集群的slot分配情况的?
在源码中顺着JedisCluster的初始化方法一路向上找到了以下代码
这个方法就是在初始化JedisCluster时顺便初始化了一个重要的cache,即集群的slot分布情况:
可以看到,这里就是用参数配置的集群节点,去顺序连接任意一个节点,构造Jedis实例,只要连接上就可以初始化slot和cluster的整个集群的映射关系,然后break。
这里,其实也看到了Jedis的本质:持有和一个node的物理连接,并进行发送redis命令的句柄,不信,我们可以继续溯源而上,在Jedis里持有了一个Client实例,而Client的类图如下,
在Connection中发现了Socket的连接、释放、sendCommand等操作,
4. 连接池缓存
那么,JedisCluster中的cache是如何缓存连接的呢?到JedisClusterInfoCache类中一探究竟,
两个重要的数据结构:
代码语言:javascript复制private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
- 第一个map是缓存node到JedisPool的映射
- 第二个map是缓存slot到JedisPool的映射
那么,我们继续深入JedisPool,发现JedisPool继承了Pool<Jedis>,而Pool<Jedis>本质上就是对对象池(ObjectPool)进行了代理和封装,
而池化的正是Jedis!
5. 结论
现在,迷雾已经渐渐揭开,结论如下:
JedisCluster在初始化方法中会去根据传参的redis集群的节点ip:port连接任意一个,从而获取进行一次slotdiscover,并将slot和节点的映射关系缓存在JedisClusterInfoCache类的map中。
初始化完后,又会根据传入的配置参数,构造到各个node的JedisPool,这个JedisPool本质上就是一个到各个node的Jedis实例池(利用了apache的common.pool2的GenericObjectPool技术)
而Jedis又会持有真正的到各个node的Connection(即物理Socket)。
数据不一致的极限情况
回到这个图,设想这样一个场景,
JedisCluster被先后调用了两次发送命令:
代码语言:javascript复制第一次:set keyA B
第二次:set keyA C
这个时候,由于key值都是一样的keyA,所以一定是到map中同一个JedisPool中找Jedis句柄,假设第一次调用获取了Jedis1(socket1),第二次调用获取了Jedis(socket2),如上图,两条set命令分别延迟两条socket向node1进行传递,这显然无法保证到达node1的顺序是按照在JedisCluster端的语义顺序。
原本应该被设置为C的keyA,可能由于极限的网络问题被设置为B。
解决
知道了上面问题的真相后,其实解决方式也水到渠成了,
无论调用JedisCluster的是否是同一个线程(即taskslot),只需保证他们得到的是同一个socket句柄,那么tcp会保证消息的顺序性!
而且,由于我们的上游是按照key做hash的keyby算子处理,所以,同一个key会被同一个taskslot线程进行sink到redis中,那么,顺序性自然也就没问题了!
所以我们只需要将对象池中的对象个数设置为1即可!(以下是在flink sql的redis ddl中的参数设置方式)
代码语言:javascript复制'connection.max-total' = '1'
既然说JedisCluster使用了apache的common.pool2对Jedis进行池化,那么不妨来了解下这项了不起的池化技术。
池化技术
为什么会有对象池?
在实际的应用工程当中,存在一些被频繁使用的、创建或者销毁比较耗时、持有的资源也比较昂贵的一些对象。比如:数据库连接对象、线程对象。所以如果能够通过一种方式,把这类对象统一管理,让这类对象可以被循环利用的话,就可以减少很多系统开销(内存、CPU、IO等),极大的提升应用性能。
Apache Commons Pool
Apache Commons Pool就是一个对象池的框架,他提供了一整套用于实现对象池化的API,以及若干种各具特色的对象池实现。Apache Commons Pool是很多连接池实现的基础,比如DBCP连接池、Jedis连接池等。
在commons-pool2中,对象池的核心接口叫做ObjectPool,他定义了对象池的应该实现的行为。
- addObject方法:往池中添加一个对象。池子里的所有对象都是通过这个方法进来的。
- borrowObject方法:从池中借走到一个对象。借走不等于删除。对象一直都属于池子,只是状态的变化。
- returnObject方法:把对象归还给对象池。归还不等于添加。对象一直都属于池子,只是状态的变化。
- invalidateObject:销毁一个对象。这个方法才会将对象从池子中删除,当然这其中最重要的就是释放对象本身持有的各种资源。
- getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
- getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
- clear:清理对象池。注意是清理不是清空,改方法要求的是,清理所有空闲对象,释放相关资源。
- close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。
在commons-pool2中,ObjectPool的核心实现类是GenericObjectPool。
GenericObjectPool常用参数
代码语言:javascript复制/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.pool2.impl;
/**
* A simple "struct" encapsulating the configuration for a
* {@link GenericObjectPool}.
*
* <p>
* This class is not thread-safe; it is only intended to be used to provide
* attributes used when creating a pool.
* </p>
*
* @param <T> Type of element pooled.
* @since 2.0
*/
public class GenericObjectPoolConfig<T> extends BaseObjectPoolConfig<T> {
/**
* The default value for the {@code maxTotal} configuration attribute.
* @see GenericObjectPool#getMaxTotal()
*/
public static final int DEFAULT_MAX_TOTAL = 8;
/**
* The default value for the {@code maxIdle} configuration attribute.
* @see GenericObjectPool#getMaxIdle()
*/
public static final int DEFAULT_MAX_IDLE = 8;
/**
* The default value for the {@code minIdle} configuration attribute.
* @see GenericObjectPool#getMinIdle()
*/
public static final int DEFAULT_MIN_IDLE = 0;
private int maxTotal = DEFAULT_MAX_TOTAL;
private int maxIdle = DEFAULT_MAX_IDLE;
private int minIdle = DEFAULT_MIN_IDLE;
/**
* Get the value for the {@code maxTotal} configuration attribute
* for pools created with this configuration instance.
*
* @return The current setting of {@code maxTotal} for this
* configuration instance
*
* @see GenericObjectPool#getMaxTotal()
*/
public int getMaxTotal() {
return maxTotal;
}
/**
* Set the value for the {@code maxTotal} configuration attribute for
* pools created with this configuration instance.
*
* @param maxTotal The new setting of {@code maxTotal}
* for this configuration instance
*
* @see GenericObjectPool#setMaxTotal(int)
*/
public void setMaxTotal(final int maxTotal) {
this.maxTotal = maxTotal;
}
/**
* Get the value for the {@code maxIdle} configuration attribute
* for pools created with this configuration instance.
*
* @return The current setting of {@code maxIdle} for this
* configuration instance
*
* @see GenericObjectPool#getMaxIdle()
*/
public int getMaxIdle() {
return maxIdle;
}
/**
* Set the value for the {@code maxIdle} configuration attribute for
* pools created with this configuration instance.
*
* @param maxIdle The new setting of {@code maxIdle}
* for this configuration instance
*
* @see GenericObjectPool#setMaxIdle(int)
*/
public void setMaxIdle(final int maxIdle) {
this.maxIdle = maxIdle;
}
/**
* Get the value for the {@code minIdle} configuration attribute
* for pools created with this configuration instance.
*
* @return The current setting of {@code minIdle} for this
* configuration instance
*
* @see GenericObjectPool#getMinIdle()
*/
public int getMinIdle() {
return minIdle;
}
/**
* Set the value for the {@code minIdle} configuration attribute for
* pools created with this configuration instance.
*
* @param minIdle The new setting of {@code minIdle}
* for this configuration instance
*
* @see GenericObjectPool#setMinIdle(int)
*/
public void setMinIdle(final int minIdle) {
this.minIdle = minIdle;
}
@SuppressWarnings("unchecked")
@Override
public GenericObjectPoolConfig<T> clone() {
try {
return (GenericObjectPoolConfig<T>) super.clone();
} catch (final CloneNotSupportedException e) {
throw new AssertionError(); // Can't happen
}
}
@Override
protected void toStringAppendFields(final StringBuilder builder) {
super.toStringAppendFields(builder);
builder.append(", maxTotal=");
builder.append(maxTotal);
builder.append(", maxIdle=");
builder.append(maxIdle);
builder.append(", minIdle=");
builder.append(minIdle);
}
}
其实,了解线程池的同学很容易找到同感,毕竟使用的技术是一样的,思想也是一样的。