Redis+Lua 实现消息和接口幂等性

2023-09-18 08:57:55 浏览数 (2)

简介

为了防止消息重复消费导致业务处理异常,消息队列RocketMQ版的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。

什么是消息幂等

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。

例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100美元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100美元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

适用场景

在互联网应用中,尤其在网络不稳定的情况下,消息队列RocketMQ版的消息有可能会出现重复。如果消息重复会影响您的业务处理,请对消息做幂等处理。

消息重复的场景如下

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同但Message ID不同的消息。

投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

负载均衡时消息重复

包括但不限于网络抖动、Broker重启以及消费者应用重启

当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到少量重复消息。

处理伪方案

因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

代码语言:javascript复制
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);           

消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:

代码语言:javascript复制
consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务唯一标识的Key做幂等处理。
    }
});           

实战案例

方案一:使用Redis的set命令

代码语言:javascript复制
/**
 * @desc 第一种处理方式
 * @param string $orderSn
 * @return bool
 * @author Tinywan(ShaoBo Wan)
 */
protected static function saveDeliverSetCommand(string $orderSn): bool
{
    return self::_redis()->set($orderSn, 'true', ['nx', 'ex' => 120]);
}

说明:上述代码和前面描述的原理一致,但实际上存在问题,在高并发场景下依然会有幂等性问题,这是因为没有充分利用redis的原子性。

方案二:使用Redis原子性

使用Redis的原子性操作,比如SETNXEXPIRE来实现更可靠的幂等性控制。

代码语言:javascript复制
/**
 * @desc 第二种处理方式
 * @param string $orderSn
 * @return bool
 * @author Tinywan(ShaoBo Wan)
 */
protected static function saveDeliverSetnxCommand(string $orderSn): bool
{
    $result = self::_redis()->setnx($orderSn, 'true');
    if ($result) {
        self::_redis()->expire($orderSn, 120);
    }
    return $result;
}

使用SETNX命令尝试将业务唯一标识保存到Redis中,如果返回1表示设置成功,说明是第一次提交;否则返回0,表示重复提交。

方案三:使用Redis Lua脚本

伪代码

代码语言:javascript复制
/**
 * @desc 第三种处理方式
 * @param string $orderSn
 * @return bool
 * @author Tinywan(ShaoBo Wan)
 */
protected static function saveDeliverLuaScript(string $orderSn): bool
{
    // TODO 使用Lua脚本执行原子性操作
    $script = <<<tinywan
        if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
            redis.call('EXPIRE', KEYS[1], ARGV[2])
            return true
        else
            return false
        end    
tinywan;
    // TODO 将业务唯一标识保存到Redis中,并设置过期时间(120秒)
    return (bool)self::_redis()->eval($script, [$orderSn, 'true', 120], 1);
}

Redis中使用Lua的好处

  • 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延
  • 原子操作。redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。因此在编写脚本的过程中无需担心会出现竞态条件,无需使用事务。
  • 复用。客户端发送的脚步会永久存在redis中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。

Redis Lua脚本与事务

从定义上来说, Redis 中的脚本本身就是一种事务, 所以任何在事务里可以完成的事, 在脚本里面也能完成。 并且一般来说, 使用脚本要来得更简单,并且速度更快。

Lua 脚本命令参数

首先定义了一个字符串变量 $script,用于存储Lua脚本的内容。

if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then: 使用 Redis 的 SETNX 命令,在键 KEYS[1] 中设置值为 ARGV[1]ARGV 是一个参数数组)。如果 SETNX 返回值为 1(表示设置成功),则执行以下代码块。

redis.call('EXPIRE', KEYS[1], ARGV[2]): 使用 Redis 的 EXPIRE 命令,在键 KEYS[1] 设置过期时间为 ARGV[2] 秒。

return true:返回布尔值 true 给调用方,表示设置和过期时间设置都成功。

else:如果 SETNX 返回值不为 1,则执行以下代码块。

return false:返回布尔值 false 给调用方,表示设置失败。

使用 evalSha命令方法替换 eval命令方法

代码语言:javascript复制
/**
 * @desc 第三种方式 Plus
 * @param string $orderSn
 * @return bool
 * @author Tinywan(ShaoBo Wan)
 */
protected static function saveDeliverLuaScriptPlus(string $orderSn): bool
{
    $redis = self::_redis();
    $scriptShaKey = 'REDIS:SCRIPT:SHA';
    $scriptSha = $redis->get($scriptShaKey);
    if (!$scriptSha) {
        // TODO 使用Lua脚本执行原子性操作
        // TODO 使用SETNX命令尝试将$orderSn保存到Redis中,如果返回1表示设置成功,说明是第一次提交;否则返回0,表示重复提交
        $script = <<<tinywan
        if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
            redis.call('EXPIRE', KEYS[1], ARGV[2])
            return true
        else
            return false
        end    
tinywan;
        $scriptSha = $redis->script('load', $script);
        $redis->set($scriptShaKey, $scriptSha);
    }
    // TODO 将orderSn保存到Redis中,并设置过期时间(120秒)
    return (bool)self::_redis()->evalSha($scriptSha, [$orderSn, 'true', 12000], 1);
}

evalShaeval两者的差异

  • eval直接执行上传的脚本不从缓存拿
  • evalsha直接从缓存中取sha中脚本信息执行

生产环境中,推荐使用EVALSHA,相较于EVAL的每次发送脚本主体、浪费带宽,会更高效。

核心代码如下

代码语言:javascript复制
<?php
/**
 * @desc MQDeliver
 * @author Tinywan(ShaoBo Wan)
 * @date 2023/9/17 13:02
 */
declare(strict_types=1);

class MQDeliver
{
    /**
     * @desc: 获取Redis实例
     * @return Redis
     * @author Tinywan(ShaoBo Wan)
     */
    protected static function _redis()
    {
        return server_redis();
    }

    /**
     * @desc 第一种处理方式
     * @param string $orderSn
     * @return bool
     * @author Tinywan(ShaoBo Wan)
     */
    protected static function saveDeliverSetCommand(string $orderSn): bool
    {
        return self::_redis()->set($orderSn, 'true', ['nx', 'ex' => 120]);
    }

    /**
     * @desc 第二种处理方式
     * @param string $orderSn
     * @return bool
     * @author Tinywan(ShaoBo Wan)
     */
    protected static function saveDeliverSetnxCommand(string $orderSn): bool
    {
        $result = self::_redis()->setnx($orderSn, 'true');
        if ($result) {
            self::_redis()->expire($orderSn, 120);
        }
        return $result;
    }

    /**
     * @desc 第三种处理方式
     * @param string $orderSn
     * @return bool
     * @author Tinywan(ShaoBo Wan)
     */
    protected static function saveDeliverLuaScript(string $orderSn): bool
    {
        // TODO 使用Lua脚本执行原子性操作
        $script = <<<tinywan
            if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
                redis.call('EXPIRE', KEYS[1], ARGV[2])
                return true
            else
                return false
            end    
tinywan;
        // TODO 将业务唯一标识保存到Redis中,并设置过期时间(120秒)
        return (bool)self::_redis()->eval($script, [$orderSn, 'true', 12000], 1);
    }

    /**
     * @desc 第三种方式 Plus
     * @param string $orderSn
     * @return bool
     * @author Tinywan(ShaoBo Wan)
     */
    protected static function saveDeliverLuaScriptPlus(string $orderSn): bool
    {
        $redis = self::_redis();
        $scriptShaKey = 'REDIS:SCRIPT:SHA';
        $scriptSha = $redis->get($scriptShaKey);
        if (!$scriptSha) {
            // TODO 使用Lua脚本执行原子性操作
            // TODO 使用SETNX命令尝试将$orderSn保存到Redis中,如果返回1表示设置成功,说明是第一次提交;否则返回0,表示重复提交
            $script = <<<tinywan
        if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
            redis.call('EXPIRE', KEYS[1], ARGV[2])
            return true
        else
            return false
        end    
tinywan;
            $scriptSha = $redis->script('load', $script);
            $redis->set($scriptShaKey, $scriptSha);
        }

        // TODO 将orderSn保存到Redis中,并设置过期时间(120秒)
        return (bool)self::_redis()->evalSha($scriptSha, [$orderSn, 'true', 12000], 1);
    }

    /**
     * @desc: 全局消息ID投递
     * @param string $messageId
     * @return bool
     * @author Tinywan(ShaoBo Wan)
     */
    public static function messageIdPost(string $messageId): bool
    {
        return self::_redis()->setnx($messageId, $messageId);
    }

    /**
     * @desc: 业务处理
     * @param string $orderSn
     * @return string
     * @author Tinywan(ShaoBo Wan)
     */
    public static function businessHandle(string $orderSn): string
    {
        // 检查是否重复投递消息
        if (self::isRepeatedDeliver($orderSn)) {
            return '消息幂等投注失败';
        }

        // TODO 投递消息ID(全局唯一),业务唯一标识作为幂等处理的关键依据,如订单号、交易号、流水号等
        // $execResult = self::saveDeliverSetCommand($orderSn);
        // $execResult = self::saveDeliverSetnxCommand($orderSn);
        // $execResult = self::saveDeliverLuaScript($orderSn);
        $execResult = self::saveDeliverLuaScriptPlus($orderSn);
        if (false === $execResult) {
            // TODO 重复投递消息,直接返回
            return '消息幂等投注失败';
        }

        try {
            // TODO 根据业务唯一标识的Key做幂等处理
            sleep(1);
        } catch (Exception $e) {
            return '处理失败 ' . $e->getMessage();
        } finally {
            // TODO 使用完毕后删除
            self::deleteMessageId($orderSn);
        }
        return '处理完成';
    }

    /**
     * @desc 是否重复投递消息
     * @param string $messageId
     * @return bool
     * @author Tinywan(ShaoBo Wan)
     */
    private static function isRepeatedDeliver(string $messageId): bool
    {
        return (bool)self::_redis()->exists($messageId);
    }

    /**
     * @desc: 删除消息
     * @param string $messageId
     * @return int
     * @author Tinywan(ShaoBo Wan)
     */
    public static function deleteMessageId(string $messageId): int
    {
        return self::_redis()->del($messageId);
    }
}

上述业务代码中删除业务全局唯一messageId的操作在finally块中执行,无论是否重复消费处理逻辑成功与否都会确保删除业务全局唯一messageId。

redis lua 脚本相关命令

这一小节的内容是基本命令,可粗略阅读后跳过,等使用的时候再回来查询

redis 自 2.6.0 加入了 lua 脚本相关的命令,EVALEVALSHASCRIPT EXISTSSCRIPT FLUSHSCRIPT KILLSCRIPT LOAD,自 3.2.0 加入了 lua 脚本的调试功能和命令SCRIPT DEBUG。这里对命令做下简单的介绍。

  1. EVAL执行一段lua脚本,每次都需要将完整的lua脚本传递给redis服务器。
  2. SCRIPT LOAD将一段lua脚本缓存到redis中并返回一个tag串,并不会执行。
  3. EVALSHA执行一个脚本,不过传入参数是「2」中返回的tag,节省网络带宽。
  4. SCRIPT EXISTS判断「2」返回的tag串是否存在服务器中。
  5. SCRIPT FLUSH清除服务器上的所有缓存的脚本。
  6. SCRIPT KILL杀死正在运行的脚本。
  7. SCRIPT DEBUG设置调试模式,可设置同步、异步、关闭,同步会阻塞所有请求。

生产环境中,推荐使用EVALSHA,相较于EVAL的每次发送脚本主体、浪费带宽,会更高效。这里要注意SCRIPT KILL,杀死正在运行脚本的时候,如果脚本执行过写操作了,这里会杀死失败,因为这违反了 redis lua 脚本的原子性。调试尽量放在测试环境完成之后再发布到生产环境,在生产环境调试千万不要使用同步模式,原因下文会详细讨论。

Redis 中 lua 脚本的书写和调试

redis lua 脚本是对其现有命令的扩充,单个命令不能完成、需要多个命令,但又要保证原子性的动作可以用脚本来实现。脚本中的逻辑一般比较简单,不要加入太复杂的东西,因为 redis 是单线程的,当脚本执行的时候,其他命令、脚本需要等待直到当前脚本执行完成。

因此,对 lua 的语法也不需完全了解,了解基本的使用就足够了,这里对 lua 语法不做过多介绍,会穿插到脚本示例里面。

0 人点赞