深入理解Redis的Pub/Sub模式

2023-11-07 17:02:56 浏览数 (2)

  • 什么是pub/sub?

  • Redis的pub/sub指令
  • Redis pub/sub的适用场景
  • Redis pub/sub指令的注意事项及缺点
  • 基于spring-boot-starter-data-redis实现pub/sub
  • 小结

什么是pub/sub?

Pub/Sub(发布/订阅)是一种消息传递模式,它允许一个或多个订阅者监听一个特定的主题(频道),当有新的消息发布到该主题时,所有订阅者都会收到通知。

这种模式在分布式系统中非常常见,因为它可以解耦生产者和消费者之间的关系,使得系统更加灵活和可扩展。 RocketMQ、RabbitMQ也支持Pub/Sub的消息传递模式。

以RocketMQ为例,Pub/Sub的结构如下:

RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。 生产者生产消息并发送至RocketMQ 服务端,消息被存储在服务端的主题[Topic]中,消费者通过订阅主题[Topic]消费消息。 Redis场景也类似,不同的是消息发送到了Redis服务器。 JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?

Redis的pub/sub指令

Redis实现的“发布/订阅”模式可以实现进程间的消息传递,其原理是这样的: “发布/订阅”模式中包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息。

Redis消息队列不支持消息的多播机制。 消息多播允许 生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费。支持了消息多播,不同消费组的逻辑就可以放到不同的子系统中。 为了支持多播,Redis不再依赖那5种基本类型了,它单独使用了一个模块来支持消息多播,这个模块的名字叫做PubSub,也就是PublisherSubscriber(发布者/订阅者模式)。

Redis提供了一组命令可以让开发者实现“发布/订阅”(publish/subscribe)模式,包括以下几个指令:

  1. PUBLISH:用于发布消息到指定的频道。用法是PUBLISH channel message
  2. SUBSCRIBE:用于订阅一个或多个频道。用法是SUBSCRIBE channel [channel...]
  3. UNSUBSCRIBE:用于取消订阅一个或多个频道。
  4. PSUBSCRIBE:用于订阅一个或多个频道,但不会立即开始接收消息,而是等待客户端执行SUBSCRIBE命令后才开始接收。
  5. PUNSUBSCRIBE:用于取消订阅一个或多个频道,但不会立即停止接收消息,而是等待客户端执行UNSUBSCRIBE命令后才会停止。

Redis pub/sub的适用场景

Redis的Pub/Sub模式适用于以下场景:

  1. 实时消息推送:如新闻更新、股票价格变动等。
  2. 事件驱动系统:如用户注册、订单创建等事件的通知。
  3. 分布式系统中的数据同步:如数据库的主从复制、分布式缓存等。

Redis pub/sub指令的注意事项及缺点

在使用Redis的Pub/Sub模式时,需要注意以下几点:

  1. 频道名必须是字符串类型。
  2. 发布的消息必须是字符串类型。
  3. 订阅和取消订阅频道的操作是异步的,不会阻塞客户端的其他操作。
  4. 如果客户端断开了与Redis服务器的连接,那么它订阅的所有频道都会被自动取消订阅。

在写demo之前,咱们再来多看一眼Redis PubSub模块的缺点: 1、没有消息存储。 Redis只会把消息投递给当前正在的订阅的Subscriber。 如果没有消费者,此条消息就丢弃。这与RocketMQ、RabbitMQ不同。 PubSub的生产者传递过来一条消息,Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息会被直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息,但是当挂掉的消费者重新连接上的时候,在断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。

2、Redis宕掉,期间所有的消息都丢失。

如果Redis停机重启,PubSub的消息是不会持久化的,毕竟Redis的宕机就相当于一个Subscriber都没有,所有的消息会被直接丢弃。

为弥补这个不足,2018.6,Redis5.0新增了Stream数据结构,这个功能给Redis带来了持久化消息队列。有兴趣的同学可以了解下。

基于spring-boot-starter-data-redis实现pub/sub

代码语言:javascript复制
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.7.16</version>
      </dependency>

定义Subscriber,这个类需要实现MessageListener接口:

代码语言:javascript复制

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

/**
 * @Auther: cheng.tang
 * @Date: 2023/10/17
 * @Description: redis-subject
 */
@Slf4j
public class CustomMessageListener implements MessageListener {

    public static final String CUSTOM_CHANNEL_PREFIX = "event:customer:channel:";

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());
        log.info(" channel {} body {} pattern {} ", channel, body, new String(pattern));
    }


}

绑定订阅关系(Subscription),注册Subscriber:

代码语言:javascript复制

import com.tangcheng.redislistener.expire.listener.CustomMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

import static com.tangcheng.redislistener.expire.listener.CustomMessageListener.CUSTOM_CHANNEL_PREFIX;

/**
 * @Auther: cheng.tang
 * @Date: 2023/10/17
 * @Description: redis-subject
 */
@Configuration
@Slf4j
public class MessageListenerContainerConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.json());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(RedisSerializer.json());
        return redisTemplate;
    }

    /**
     * 绑定订阅关系。subscriber和channel或topic的关系
     *
     * @param connectionFactory
     * @param threadPoolTaskExecutor
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                       TaskExecutor threadPoolTaskExecutor) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置Redis的连接工厂
        container.setConnectionFactory(connectionFactory);
        // 设置监听使用的线程池
        container.setTaskExecutor(threadPoolTaskExecutor);
        container.addMessageListener(new CustomMessageListener(), new PatternTopic(CUSTOM_CHANNEL_PREFIX   "*"));
        return container;
    }

    @Bean
    public TaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(60);
        executor.setQueueCapacity(1000);
        executor.setKeepAliveSeconds(3600);
        executor.setThreadNamePrefix("redis-listener-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }


}

定义Producer类,由Producer pub消息:

代码语言:javascript复制

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;

import static com.tangcheng.redislistener.expire.listener.CustomMessageListener.CUSTOM_CHANNEL_PREFIX;

@Service
@Slf4j
public class RedisPubJob {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Value("${server.port:8080}")
    private int serverPort;

    @Scheduled(cron = "0 */1 * * * ?")
    public void pubMsg() {
        LocalDateTime now = LocalDateTime.now();
        log.info("开始发送消息");
        int value = ThreadLocalRandom.current().nextInt();
        redisTemplate.convertAndSend(CUSTOM_CHANNEL_PREFIX   value, serverPort);
        log.info("消息发送完成: "   Duration.between(now, LocalDateTime.now()));
    }

}

搭建一个Producer 三个Subscriber的场景,期望这样的效果: 一个topic有三个subscriber场景,Producer往指定的topic pub一条消息后,订阅这个topic的三个subscirber都会消费到。 同一台JVM进程中,Redis PubSub的生产者和消费者在不同的线程中支持,也就是使用了不同的连接。因为Redis不允许连接在subscribe等待消息时还需要进行其它操作。

run起来,看下pub/sub的效果: step1: build一个jar;

step2: 启动三个jvm进程:

代码语言:javascript复制
 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8080

 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8090

 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8099

订阅情况:

源码详见文末。

小结

总的来说,Redis的Pub/Sub模式是一种非常轻量级的消息传递模型,它可以在一些低频、低数据量的场景帮助我们实现多播的实时消息推送、事件驱动系统和分布式系统中的数据同步等功能。而在Spring Boot应用中,我们可以通过Spring Boot Starter Data Redis来轻松地实现Redis的Pub/Sub模式。

参考

show the code : https://gitee.com/baidumap/redis-subject

《Redis深度历险 核心原理与应用初中》 《Redis入门指南 》(第2版)

0 人点赞