【Flink】第十五篇:Redis Connector 数据保序思考

2022-03-31 11:08:44 浏览数 (1)

【Flink】第五篇:checkpoint【1】

【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查

【Flink】第八篇:Flink 内存管理

【Flink】第九篇:Flink SQL 性能优化实战

【Flink】第十篇:join 之 regular join

【Flink】第十三篇:JVM思维导图

【Flink】第十四篇:LSM-Tree一般性总结

近期,我们基于Flink SQL的实时数仓逐渐过渡到DWS层,接到一些对于最终计算指标需要入HBase和Redis等存储引擎以便于下游的各类数据服务可以获取。所以需要一个Redis Connector。

于是,基于Apache Bahir的Redis Connector我做了一些定制化开发,最终使得其支持Flink SQL将带有撤回语义的Upsert流入Redis。但是,我始终对于这个Redis Connector充满了担心,各种大大的问号充满了脑子,例如,exactly-once的ck机制如何保证,如何对数据进行保序(线程安全是否可以保证)?

接下来,重点讨论Sink端的Redis Cluster模式下的connector保序问题。

乱序场景

1. 场景描述:

如上图,客户端初始化了一个JedisCluster实例,这个实例指向了右侧的Redis Cluster。

这个Redis Cluster包含三个节点:node1、node2、node3。每个节点包含两个redis集群角色:一个master、一个slave。

2. Redis Cluster模式回顾

Redis Cluster的设计考虑到了去中心化、去中间件(如果用zookeeper这种中间件,网络I/O就会成为Redis的瓶颈了,毕竟网络和内存的访问效率差很多个数量级)。集群中的每个节点都是对等关系,每个节点都保存各自的数据和整个集群状态。每个节点都和其他所有节点连接,这样只需要连接集群中的任意一个节点,就可以获取到其他节点的数据。

Redis集群采用哈希槽(hash slot)的方式来分配。Redis Cluster默认分配16384(2的14次方)个slot,当我们set一个key时,会用CRC16算法来取模得到所属的slot,然后将这个key分到该哈希槽所在节点上:

key > CRC16(key)384 > slot > host

Redis集群会把数据存在一个master节点(读写),然后在这个master和其对应的slave之间进行数据同步(异步的,不保证强一致性,此乃内存数据库为了性能的必然选择)。当读取数据时,也根据哈希槽算法到对应的master(读写)/slave(只读)获取数据。

3. JedisCluster初始化集群slot映射关系

回到上面的场景来,16384个slot被分到了三个node上,那问题来了,Redis是何时直到集群的slot分配情况的?

在源码中顺着JedisCluster的初始化方法一路向上找到了以下代码

这个方法就是在初始化JedisCluster时顺便初始化了一个重要的cache,即集群的slot分布情况:

可以看到,这里就是用参数配置的集群节点,去顺序连接任意一个节点,构造Jedis实例,只要连接上就可以初始化slot和cluster的整个集群的映射关系,然后break。

这里,其实也看到了Jedis的本质:持有和一个node的物理连接,并进行发送redis命令的句柄,不信,我们可以继续溯源而上,在Jedis里持有了一个Client实例,而Client的类图如下,

在Connection中发现了Socket的连接、释放、sendCommand等操作,

4. 连接池缓存

那么,JedisCluster中的cache是如何缓存连接的呢?到JedisClusterInfoCache类中一探究竟,

两个重要的数据结构:

代码语言:javascript复制
private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
  • 第一个map是缓存node到JedisPool的映射
  • 第二个map是缓存slot到JedisPool的映射

那么,我们继续深入JedisPool,发现JedisPool继承了Pool<Jedis>,而Pool<Jedis>本质上就是对对象池(ObjectPool)进行了代理和封装,

而池化的正是Jedis!

5. 结论

现在,迷雾已经渐渐揭开,结论如下:

JedisCluster在初始化方法中会去根据传参的redis集群的节点ip:port连接任意一个,从而获取进行一次slotdiscover,并将slot和节点的映射关系缓存在JedisClusterInfoCache类的map中。

初始化完后,又会根据传入的配置参数,构造到各个node的JedisPool,这个JedisPool本质上就是一个到各个node的Jedis实例池(利用了apache的common.pool2的GenericObjectPool技术)

而Jedis又会持有真正的到各个node的Connection(即物理Socket)。

数据不一致的极限情况


回到这个图,设想这样一个场景,

JedisCluster被先后调用了两次发送命令:

代码语言:javascript复制
第一次:set keyA B
第二次:set keyA C

这个时候,由于key值都是一样的keyA,所以一定是到map中同一个JedisPool中找Jedis句柄,假设第一次调用获取了Jedis1(socket1),第二次调用获取了Jedis(socket2),如上图,两条set命令分别延迟两条socket向node1进行传递,这显然无法保证到达node1的顺序是按照在JedisCluster端的语义顺序。

原本应该被设置为C的keyA,可能由于极限的网络问题被设置为B。

解决

知道了上面问题的真相后,其实解决方式也水到渠成了,

无论调用JedisCluster的是否是同一个线程(即taskslot),只需保证他们得到的是同一个socket句柄,那么tcp会保证消息的顺序性!

而且,由于我们的上游是按照key做hash的keyby算子处理,所以,同一个key会被同一个taskslot线程进行sink到redis中,那么,顺序性自然也就没问题了!

所以我们只需要将对象池中的对象个数设置为1即可!(以下是在flink sql的redis ddl中的参数设置方式)

代码语言:javascript复制
'connection.max-total' = '1'

既然说JedisCluster使用了apache的common.pool2对Jedis进行池化,那么不妨来了解下这项了不起的池化技术。

池化技术

为什么会有对象池?

在实际的应用工程当中,存在一些被频繁使用的、创建或者销毁比较耗时、持有的资源也比较昂贵的一些对象。比如:数据库连接对象、线程对象。所以如果能够通过一种方式,把这类对象统一管理,让这类对象可以被循环利用的话,就可以减少很多系统开销(内存、CPU、IO等),极大的提升应用性能。

Apache Commons Pool

Apache Commons Pool就是一个对象池的框架,他提供了一整套用于实现对象池化的API,以及若干种各具特色的对象池实现。Apache Commons Pool是很多连接池实现的基础,比如DBCP连接池、Jedis连接池等。

在commons-pool2中,对象池的核心接口叫做ObjectPool,他定义了对象池的应该实现的行为。

  • addObject方法:往池中添加一个对象。池子里的所有对象都是通过这个方法进来的。
  • borrowObject方法:从池中借走到一个对象。借走不等于删除。对象一直都属于池子,只是状态的变化。
  • returnObject方法:把对象归还给对象池。归还不等于添加。对象一直都属于池子,只是状态的变化。
  • invalidateObject:销毁一个对象。这个方法才会将对象从池子中删除,当然这其中最重要的就是释放对象本身持有的各种资源。
  • getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
  • getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
  • clear:清理对象池。注意是清理不是清空,改方法要求的是,清理所有空闲对象,释放相关资源。
  • close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。

在commons-pool2中,ObjectPool的核心实现类是GenericObjectPool。

GenericObjectPool常用参数

代码语言:javascript复制
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.commons.pool2.impl;

/**
 * A simple "struct" encapsulating the configuration for a
 * {@link GenericObjectPool}.
 *
 * <p>
 * This class is not thread-safe; it is only intended to be used to provide
 * attributes used when creating a pool.
 * </p>
 *
 * @param <T> Type of element pooled.
 * @since 2.0
 */
public class GenericObjectPoolConfig<T> extends BaseObjectPoolConfig<T> {

/**
     * The default value for the {@code maxTotal} configuration attribute.
     * @see GenericObjectPool#getMaxTotal()
     */
public static final int DEFAULT_MAX_TOTAL = 8;

/**
     * The default value for the {@code maxIdle} configuration attribute.
     * @see GenericObjectPool#getMaxIdle()
     */
public static final int DEFAULT_MAX_IDLE = 8;

/**
     * The default value for the {@code minIdle} configuration attribute.
     * @see GenericObjectPool#getMinIdle()
     */
public static final int DEFAULT_MIN_IDLE = 0;


private int maxTotal = DEFAULT_MAX_TOTAL;

private int maxIdle = DEFAULT_MAX_IDLE;

private int minIdle = DEFAULT_MIN_IDLE;

/**
     * Get the value for the {@code maxTotal} configuration attribute
     * for pools created with this configuration instance.
     *
     * @return  The current setting of {@code maxTotal} for this
     *          configuration instance
     *
     * @see GenericObjectPool#getMaxTotal()
     */
public int getMaxTotal() {
return maxTotal;
    }

/**
     * Set the value for the {@code maxTotal} configuration attribute for
     * pools created with this configuration instance.
     *
     * @param maxTotal The new setting of {@code maxTotal}
     *        for this configuration instance
     *
     * @see GenericObjectPool#setMaxTotal(int)
     */
public void setMaxTotal(final int maxTotal) {
this.maxTotal = maxTotal;
    }


/**
     * Get the value for the {@code maxIdle} configuration attribute
     * for pools created with this configuration instance.
     *
     * @return  The current setting of {@code maxIdle} for this
     *          configuration instance
     *
     * @see GenericObjectPool#getMaxIdle()
     */
public int getMaxIdle() {
return maxIdle;
    }

/**
     * Set the value for the {@code maxIdle} configuration attribute for
     * pools created with this configuration instance.
     *
     * @param maxIdle The new setting of {@code maxIdle}
     *        for this configuration instance
     *
     * @see GenericObjectPool#setMaxIdle(int)
     */
public void setMaxIdle(final int maxIdle) {
this.maxIdle = maxIdle;
    }


/**
     * Get the value for the {@code minIdle} configuration attribute
     * for pools created with this configuration instance.
     *
     * @return  The current setting of {@code minIdle} for this
     *          configuration instance
     *
     * @see GenericObjectPool#getMinIdle()
     */
public int getMinIdle() {
return minIdle;
    }

/**
     * Set the value for the {@code minIdle} configuration attribute for
     * pools created with this configuration instance.
     *
     * @param minIdle The new setting of {@code minIdle}
     *        for this configuration instance
     *
     * @see GenericObjectPool#setMinIdle(int)
     */
public void setMinIdle(final int minIdle) {
this.minIdle = minIdle;
    }

@SuppressWarnings("unchecked")
@Override
public GenericObjectPoolConfig<T> clone() {
try {
return (GenericObjectPoolConfig<T>) super.clone();
        } catch (final CloneNotSupportedException e) {
throw new AssertionError(); // Can't happen
        }
    }

@Override
protected void toStringAppendFields(final StringBuilder builder) {
super.toStringAppendFields(builder);
        builder.append(", maxTotal=");
        builder.append(maxTotal);
        builder.append(", maxIdle=");
        builder.append(maxIdle);
        builder.append(", minIdle=");
        builder.append(minIdle);
    }
}

其实,了解线程池的同学很容易找到同感,毕竟使用的技术是一样的,思想也是一样的。

0 人点赞