高性能/并发的保证-Netty在Redisson的应用

2020-04-14 16:45:05 浏览数 (1)

前言

​ Redisson Github: https://github.com/redisson/redisson

​ Redisson 官网:https://redisson.pro/

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

以下是Redisson的结构:

  • Redisson作为独立节点 可以用于独立执行其他节点发布到分布式执行服务 和 分布式调度任务服务 里的远程任务。

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6 以上版本。

客户端初始化

createBootstrap

org.redisson.client.RedisClient#createBootstrap

代码语言:javascript复制
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
                        .resolver(config.getResolverGroup())
          							//1.指定配置中的IO类型
                        .channel(config.getSocketChannelClass())
          							//2.指定配置中的线程模型
                        .group(config.getGroup());
  			//3.IO处理逻辑
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
  			//4. 指定bootstrap配置选项
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
        bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
        config.getNettyHook().afterBoostrapInitialization(bootstrap);
        return bootstrap;
    }

从上面的代码可以看到,客户端启动的引导类是 Bootstrap,负责启动客户端以及连接服务端,引导类创建完成之后,下面我们描述一下客户端启动的流程。

一. 首先,我们需要给它指定线程模型,驱动着连接的数据读写。然后,redisson默认指定 IO 模型为 NioSocketChannel

二. 接着,给引导类指定一系列处理链路,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析

RedisChannelInitializer

org.redisson.client.handler.RedisChannelInitializer

代码语言:javascript复制
 @Override
    protected void initChannel(Channel ch) throws Exception {
      	// 开启SSL终端识别能力
        initSsl(config, ch);
        
        if (type == Type.PLAIN) {
          	//Redis正常连接处理类
            ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
        } else {
          	//Redis订阅发布处理类
            ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
        }
        
        ch.pipeline().addLast(
          	//链路检测狗
            connectionWatchdog,
          	//Redis协议命令编码器
            CommandEncoder.INSTANCE,
          	//Redis协议命令批量编码器
            CommandBatchEncoder.INSTANCE,
          	//Redis命令队列
            new CommandsQueue());
        
        if (pingConnectionHandler != null) {
           //心跳包连接处理类
            ch.pipeline().addLast(pingConnectionHandler);
        }
        
        if (type == Type.PLAIN) {
          	//Redis协议命令解码器
            ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor()));
        } else {
          	//Redis订阅发布解码器
            ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor()));
        }

        config.getNettyHook().afterChannelInitialization(ch);
    }

图1 Redisson 链路处理图

Redisson的处理链

Redisson的Pipeline里面的ChannelHandler比较多,我挑选其中CommandEncoderCommandDecoder进行源码剖析。

失败重连

org.redisson.client.handler.ConnectionWatchdog#reconnect 重连机制

代码语言:javascript复制
private void reconnect(final RedisConnection connection, final int attempts){
		//重试时间越来越久
    int timeout = 2 << attempts;
    if (bootstrap.config().group().isShuttingDown()) {
        return;
    }
    
    try {
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                tryReconnect(connection, Math.min(BACKOFF_CAP, attempts   1));
            }
        }, timeout, TimeUnit.MILLISECONDS);
    } catch (IllegalStateException e) {
        // skip
    }
}

netty中的Timer管理,使用了的Hashed time Wheel的模式,Time Wheel翻译为时间轮,是用于实现定时器timer的经典算法。

这个方法的声明是这样的:

代码语言:javascript复制
 /**
     * Schedules the specified {@link TimerTask} for one-time execution after
     * the specified delay.
     *
     * @return a handle which is associated with the specified task
     *
     * @throws IllegalStateException       if this timer has been {@linkplain #stop() stopped} already
     * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
     *                                    can cause instability in the system.
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

这个方法需要一个TimerTask对象以知道当时间到时要执行什么逻辑,然后需要delay时间数值和TimeUnit时间的单位。

Redis协议命令编码器

​ Redis 的作者认为数据库系统的瓶颈一般不在于网络流量,而是数据库自身内部逻辑处理上。所以即使 Redis 使用了浪费流量的文本协议,依然可以取得极高的访问性能。Redis 将所有数据都放在内存,用一个单线程对外提供服务,单个节点在跑满一个 CPU 核心的情况下可以达到了 10w/s 的超高 QPS。

RESP 是 Redis 序列化协议的简写。它是一种直观的文本协议,优势在于实现异常简单,解析性能极好。

Redis 协议将传输的结构数据分为 5 种最小单元类型,单元结束时统一加上回车换行符号rn

  1. 单行字符串 以 符号开头。
  2. 多行字符串 以 $ 符号开头,后跟字符串长度。
  3. 整数值 以 : 符号开头,后跟整数的字符串形式。
  4. 错误消息 以 - 符号开头。
  5. 数组 以 * 号开头,后跟数组的长度。

单行字符串 hello world

代码语言:javascript复制
 hello worldrn

多行字符串 hello world

代码语言:javascript复制
$11rnhello worldrn

多行字符串当然也可以表示单行字符串。

整数 1024

代码语言:javascript复制
:1024rn

错误 参数类型错误

代码语言:javascript复制
-WRONGTYPE Operation against a key holding the wrong kind of valuern

数组 [1,2,3]

代码语言:javascript复制
*3rn:1rn:2rn:3rn

NULL 用多行字符串表示,不过长度要写成-1。

代码语言:javascript复制
$-1rn

空串 用多行字符串表示,长度填 0。

代码语言:javascript复制
$0rnrn

注意这里有两个rn。为什么是两个?因为两个rn之间,隔的是空串。

org.redisson.client.handler.CommandEncoder#encode()

代码语言:javascript复制
private static final char ARGS_PREFIX = '*';
private static final char BYTES_PREFIX = '$';
private static final byte[] CRLF = "rn".getBytes();


@Override
    protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
        try {
          	//redis命令前缀
            out.writeByte(ARGS_PREFIX);
            int len = 1   msg.getParams().length;
            if (msg.getCommand().getSubName() != null) {
                len  ;
            }
            out.writeCharSequence(Long.toString(len), CharsetUtil.US_ASCII);
            out.writeBytes(CRLF);
            
            writeArgument(out, msg.getCommand().getName().getBytes(CharsetUtil.UTF_8));
            if (msg.getCommand().getSubName() != null) {
                writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
            }
          	......
        } catch (Exception e) {
            msg.tryFailure(e);
            throw e;
        }
    }

private void writeArgument(ByteBuf out, ByteBuf arg) {
    out.writeByte(BYTES_PREFIX);
    out.writeCharSequence(Long.toString(arg.readableBytes()), CharsetUtil.US_ASCII);
    out.writeBytes(CRLF);
    out.writeBytes(arg, arg.readerIndex(), arg.readableBytes());
    out.writeBytes(CRLF);
}

Redis协议命令解码器

org.redisson.client.handler.CommandDecoder#readBytes

代码语言:javascript复制
 private static final char CR = 'r';
 private static final char LF = 'n';
 private static final char ZERO = '0';

private ByteBuf readBytes(ByteBuf is) throws IOException {
    long l = readLong(is);
    if (l > Integer.MAX_VALUE) {
        throw new IllegalArgumentException(
                "Java only supports arrays up to "   Integer.MAX_VALUE   " in size");
    }
    int size = (int) l;
    if (size == -1) {
        return null;
    }
    ByteBuf buffer = is.readSlice(size);
    int cr = is.readByte();
    int lf = is.readByte();
  	//判断是否以rn开头
    if (cr != CR || lf != LF) {
        throw new IOException("Improper line ending: "   cr   ", "   lf);
    }
    return buffer;
}

数据序列化

Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:

编码类名称

说明

org.redisson.codec.JsonJacksonCodec

Jackson JSON 编码 默认编码

org.redisson.codec.AvroJacksonCodec

Avro 一个二进制的JSON编码

org.redisson.codec.SmileJacksonCodec

Smile 另一个二进制的JSON编码

org.redisson.codec.CborJacksonCodec

CBOR 又一个二进制的JSON编码

org.redisson.codec.MsgPackJacksonCodec

MsgPack 再来一个二进制的JSON编码

org.redisson.codec.IonJacksonCodec

Amazon Ion 亚马逊的Ion编码,格式与JSON类似

org.redisson.codec.KryoCodec

Kryo 二进制对象序列化编码

org.redisson.codec.SerializationCodec

JDK序列化编码

org.redisson.codec.FstCodec

FST 10倍于JDK序列化性能而且100%兼容的编码

org.redisson.codec.LZ4Codec

LZ4 压缩型序列化对象编码

org.redisson.codec.SnappyCodec

Snappy 另一个压缩型序列化对象编码

org.redisson.client.codec.JsonJacksonMapCodec

基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。

org.redisson.client.codec.StringCodec

纯字符串编码(无转换)

org.redisson.client.codec.LongCodec

纯整长型数字编码(无转换)

org.redisson.client.codec.ByteArrayCodec

字节数组编码

org.redisson.codec.CompositeCodec

用来组合多种不同编码在一起

Codec

代码语言:javascript复制
public interface Codec {

  	//返回用于HMAP Redis结构中哈希映射值的对象解码器
    Decoder<Object> getMapValueDecoder();

  	//返回用于HMAP Redis结构中哈希映射值的对象编码器
    Encoder getMapValueEncoder();

  	//返回用于HMAP Redis结构中哈希映射键的对象解码器
    Decoder<Object> getMapKeyDecoder();

  	//返回用于HMAP Redis结构中哈希映射键的对象编码器
    Encoder getMapKeyEncoder();

    //返回用于除HMAP之外的任何存储Redis结构的对象解码器
    Decoder<Object> getValueDecoder();

    //返回用于除HMAP之外的任何存储Redis结构的对象编码器
    Encoder getValueEncoder();

    //返回用于加载解码过程中使用的类的类加载器对象
    ClassLoader getClassLoader();

}

BaseCodec

org.redisson.client.codec.BaseCodec

  1. HashMap的键值对的编解码的处理类使用普通的对象编解码处理类进行分解。 //返回用于除HMAP之外的任何存储Redis结构的对象解码器 Decoder<Object> getValueDecoder(); //返回用于除HMAP之外的任何存储Redis结构的对象编码器 Encoder getValueEncoder();

SerializationCodec

org.redisson.codec.SerializationCodec

Decoder

Encoder

0 人点赞