- 什么是pub/sub?
- Redis的pub/sub指令
- Redis pub/sub的适用场景
- Redis pub/sub指令的注意事项及缺点
- 基于spring-boot-starter-data-redis实现pub/sub
- 小结
什么是pub/sub?
Pub/Sub(发布/订阅)是一种消息传递模式,它允许一个或多个订阅者监听一个特定的主题(频道),当有新的消息发布到该主题时,所有订阅者都会收到通知。
这种模式在分布式系统中非常常见,因为它可以解耦生产者和消费者之间的关系,使得系统更加灵活和可扩展。 RocketMQ、RabbitMQ也支持Pub/Sub的消息传递模式。
以RocketMQ为例,Pub/Sub的结构如下:
RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。 生产者生产消息并发送至RocketMQ 服务端,消息被存储在服务端的主题[Topic]中,消费者通过订阅主题[Topic]消费消息。 Redis场景也类似,不同的是消息发送到了Redis服务器。 JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?
Redis的pub/sub指令
Redis实现的“发布/订阅”模式可以实现进程间的消息传递,其原理是这样的: “发布/订阅”模式中包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息。
Redis消息队列不支持消息的多播机制。 消息多播允许 生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费。支持了消息多播,不同消费组的逻辑就可以放到不同的子系统中。 为了支持多播,Redis不再依赖那5种基本类型了,它单独使用了一个模块来支持消息多播,这个模块的名字叫做PubSub,也就是PublisherSubscriber(发布者/订阅者模式)。
Redis提供了一组命令可以让开发者实现“发布/订阅”(publish/subscribe)模式,包括以下几个指令:
- PUBLISH:用于发布消息到指定的频道。用法是PUBLISH channel message
- SUBSCRIBE:用于订阅一个或多个频道。用法是SUBSCRIBE channel [channel...]
- UNSUBSCRIBE:用于取消订阅一个或多个频道。
- PSUBSCRIBE:用于订阅一个或多个频道,但不会立即开始接收消息,而是等待客户端执行SUBSCRIBE命令后才开始接收。
- PUNSUBSCRIBE:用于取消订阅一个或多个频道,但不会立即停止接收消息,而是等待客户端执行UNSUBSCRIBE命令后才会停止。
Redis pub/sub的适用场景
Redis的Pub/Sub模式适用于以下场景:
- 实时消息推送:如新闻更新、股票价格变动等。
- 事件驱动系统:如用户注册、订单创建等事件的通知。
- 分布式系统中的数据同步:如数据库的主从复制、分布式缓存等。
Redis pub/sub指令的注意事项及缺点
在使用Redis的Pub/Sub模式时,需要注意以下几点:
- 频道名必须是字符串类型。
- 发布的消息必须是字符串类型。
- 订阅和取消订阅频道的操作是异步的,不会阻塞客户端的其他操作。
- 如果客户端断开了与Redis服务器的连接,那么它订阅的所有频道都会被自动取消订阅。
在写demo之前,咱们再来多看一眼Redis PubSub模块的缺点: 1、没有消息存储。 Redis只会把消息投递给当前正在的订阅的Subscriber。 如果没有消费者,此条消息就丢弃。这与RocketMQ、RabbitMQ不同。 PubSub的生产者传递过来一条消息,Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息会被直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息,但是当挂掉的消费者重新连接上的时候,在断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
2、Redis宕掉,期间所有的消息都丢失。
如果Redis停机重启,PubSub的消息是不会持久化的,毕竟Redis的宕机就相当于一个Subscriber都没有,所有的消息会被直接丢弃。
为弥补这个不足,2018.6,Redis5.0新增了Stream数据结构,这个功能给Redis带来了持久化消息队列。有兴趣的同学可以了解下。
基于spring-boot-starter-data-redis实现pub/sub
代码语言:javascript复制 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.7.16</version>
</dependency>
定义Subscriber,这个类需要实现MessageListener接口:
代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
/**
* @Auther: cheng.tang
* @Date: 2023/10/17
* @Description: redis-subject
*/
@Slf4j
public class CustomMessageListener implements MessageListener {
public static final String CUSTOM_CHANNEL_PREFIX = "event:customer:channel:";
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
log.info(" channel {} body {} pattern {} ", channel, body, new String(pattern));
}
}
绑定订阅关系(Subscription),注册Subscriber:
代码语言:javascript复制
import com.tangcheng.redislistener.expire.listener.CustomMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import static com.tangcheng.redislistener.expire.listener.CustomMessageListener.CUSTOM_CHANNEL_PREFIX;
/**
* @Auther: cheng.tang
* @Date: 2023/10/17
* @Description: redis-subject
*/
@Configuration
@Slf4j
public class MessageListenerContainerConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setValueSerializer(RedisSerializer.json());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
redisTemplate.setHashValueSerializer(RedisSerializer.json());
return redisTemplate;
}
/**
* 绑定订阅关系。subscriber和channel或topic的关系
*
* @param connectionFactory
* @param threadPoolTaskExecutor
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
TaskExecutor threadPoolTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置Redis的连接工厂
container.setConnectionFactory(connectionFactory);
// 设置监听使用的线程池
container.setTaskExecutor(threadPoolTaskExecutor);
container.addMessageListener(new CustomMessageListener(), new PatternTopic(CUSTOM_CHANNEL_PREFIX "*"));
return container;
}
@Bean
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(60);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(3600);
executor.setThreadNamePrefix("redis-listener-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
定义Producer类,由Producer pub消息:
代码语言:javascript复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;
import static com.tangcheng.redislistener.expire.listener.CustomMessageListener.CUSTOM_CHANNEL_PREFIX;
@Service
@Slf4j
public class RedisPubJob {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Value("${server.port:8080}")
private int serverPort;
@Scheduled(cron = "0 */1 * * * ?")
public void pubMsg() {
LocalDateTime now = LocalDateTime.now();
log.info("开始发送消息");
int value = ThreadLocalRandom.current().nextInt();
redisTemplate.convertAndSend(CUSTOM_CHANNEL_PREFIX value, serverPort);
log.info("消息发送完成: " Duration.between(now, LocalDateTime.now()));
}
}
搭建一个Producer 三个Subscriber的场景,期望这样的效果: 一个topic有三个subscriber场景,Producer往指定的topic pub一条消息后,订阅这个topic的三个subscirber都会消费到。 同一台JVM进程中,Redis PubSub的生产者和消费者在不同的线程中支持,也就是使用了不同的连接。因为Redis不允许连接在subscribe等待消息时还需要进行其它操作。
run起来,看下pub/sub的效果: step1: build一个jar;
step2: 启动三个jvm进程:
代码语言:javascript复制 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8080
java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8090
java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8099
订阅情况:
源码详见文末。
小结
总的来说,Redis的Pub/Sub模式是一种非常轻量级的消息传递模型,它可以在一些低频、低数据量的场景帮助我们实现多播的实时消息推送、事件驱动系统和分布式系统中的数据同步等功能。而在Spring Boot应用中,我们可以通过Spring Boot Starter Data Redis来轻松地实现Redis的Pub/Sub模式。
参考
show the code : https://gitee.com/baidumap/redis-subject
《Redis深度历险 核心原理与应用初中》 《Redis入门指南 》(第2版)