前言
Redis发布订阅(Pub/Sub)是Redis提供的一种消息传递机制,它使用“发布者-订阅者”(publisher-subscriber)模式来处理消息传递。在这种模式下,发布者将消息发布到一组订阅者中,而无需关心谁是订阅者,也不需要知道订阅者是否收到了消息。
发布者和订阅者模式允许多个客户端之间建立一个复杂的通信拓扑。在这种模式下,发布者可以发布消息到一个特定的主题,订阅者可以订阅一个或多个主题,并在发布者发布消息时收到消息。由于发布者和订阅者不必直接连接,因此发布者和订阅者可以完全独立地运行,只要它们都连接到Redis实例即可。
Redis发布订阅支持多种消息类型,包括文本、字节数组和数字等。 Redis还支持订阅者识别特定消息,通过模式匹配功能,可以基于主题模式或模式来检索消息。Redis还提供了许多API来帮助您实现发布/订阅模式,因此您可以使用Redis的发布/订阅功能来构建分布式应用程序。
Redis Pub/Sub(发布/订阅) 命令
Redis发布/订阅(Pub/Sub)分为两种
- 第一种基于频道(Channel)的发布/订阅。
- 第二种基于模式(pattern)的发布/订阅。
确实,Redis提供了一系列的Pub/Sub命令来支持基于频道和基于模式的发布/订阅模式。以下是一些常用的Pub/Sub命令:
基于频道的发布/订阅
发布消息到指定频道
代码语言:javascript复制PUBLISH channel message
例如:
代码语言:javascript复制PUBLISH my-channel "Hello, Redis!"
这将向名为my-channel
的频道发布消息"Hello, Redis!"。
订阅一个或多个频道
代码语言:javascript复制SUBSCRIBE channel channel ...
例如:
代码语言:javascript复制SUBSCRIBE my-channel your-channel
这将订阅my-channel
和your-channel
两个频道。
取消订阅一个或多个频道
代码语言:javascript复制UNSUBSCRIBE [channel channel ...]
例如:
代码语言:javascript复制UNSUBSCRIBE my-channel your-channel
这将取消订阅my-channel
和your-channel
两个频道。
基于模式的发布/订阅
订阅一个或多个匹配模式
代码语言:javascript复制PSUBSCRIBE pattern pattern ...
例如:
代码语言:javascript复制PSUBSCRIBE news-*
这将订阅所有以news-
开头的频道。
取消订阅一个或多个匹配模式
代码语言:javascript复制PUNSUBSCRIBE [pattern pattern ...]
例如:
代码语言:javascript复制PUNSUBSCRIBE news-*
这将取消订阅所有以news-
开头的频道。
注意:Pub/Sub命令可以在客户端和服务器之间进行通信,用于实现消息的发布和订阅。这些命令是异步执行的,发送命令后,订阅者将在接收到消息时收到通知。 Pub/Sub是一个强大的工具,用于实现实时消息传递和事件通知。
实战示例
基于MessageListener实现
创建消息接收者
创建一个接收消息的Bean。
代码语言:javascript复制package com.example.demo.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* 实现MessageListener的监听类
*/
@Slf4j
@Component
public class RedisMessageSubscriber implements MessageListener {
@Autowired
private MessageProcessor messageProcessor;
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
log.info("@@ 当前执行的方法:onMessage");
// 处理消息
messageProcessor.processMessage(channel, body);
}
}
创建消息处理器
创建一个处理接收到的消息的Bean。
代码语言:javascript复制package com.example.demo.redis;
import org.springframework.stereotype.Service;
@Service
public class MessageProcessor {
public void processMessage(String channel, String message) {
System.out.println("Received message: " message " from channel: " channel);
// 在这里进行具体的消息处理逻辑
}
}
创建消息发送者
创建一个发送消息的Bean。
Redis有两种发布/订阅模式:
- 基于频道(Channel)的发布/订阅
- 基于模式(pattern)的发布/订阅
package com.example.demo.redis;// RedisMessagePublisher.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisMessagePublisher {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void publishMessage(String channel, String message) {
// 基于模式(pattern)的发布/订阅
// redisTemplate.convertAndSend("your-pattern-channel-1", "Hello, Redis!");
// 基于频道(Channel)的发布/订阅
redisTemplate.convertAndSend(channel, message);
}
}
使用消息发送者发送消息
在需要发送消息的地方注入RedisMessagePublisher
并使用它来发送消息。
package com.example.demo.redis;// MessageController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/message")
public class MessageController {
@Autowired
private RedisMessagePublisher messagePublisher;
@GetMapping("/send")
public String sendMessage(@RequestParam(value = "channel") String channel , @RequestParam(value = "messgage") String messgage) {
messagePublisher.publishMessage("your-channel", "Hello, Redis!");
messagePublisher.publishMessage(channel, messgage);
return "Message sent successfully!";
}
}
相关原理说明
- 发布/订阅模型: Redis提供了一种发布/订阅(Pub/Sub)模型,其中消息发送者(发布者)将消息发送到一个或多个通道,而消息接收者(订阅者)则监听一个或多个通道以接收消息。
- 消息监听器:
RedisMessageSubscriber
实现了MessageListener
接口,它监听指定通道上的消息。在这里,我们将接收到的消息传递给MessageProcessor
进行处理。 - 消息处理器:
MessageProcessor
是一个简单的服务,用于处理接收到的消息。在实际应用中,你可以在这里添加业务逻辑来处理消息。 - 消息发布者:
RedisMessagePublisher
用于发布消息到指定的通道。在sendMessage
方法中,我们使用convertAndSend
方法将消息发送到名为 "your-channel" 的通道。 - 消息发送端点:
MessageController
是一个简单的REST控制器,用于触发消息发送。在这里,我们通过调用messagePublisher.publishMessage
来发送消息。
总体来说,这个实现充分利用了Redis的发布/订阅功能,通过将消息发送者、消息接收者和消息处理器分离,使系统更加模块化和灵活。
RedisConfig配置
代码语言:javascript复制package com.example.demo.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Slf4j
@Configuration
public class RedisConfig {
/**
* 配置订阅
* 基于MessageListenerAdapter和RedisMessageListenerContainer
* @param redisMessageSubscriber
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {
return new MessageListenerAdapter(redisMessageSubscriber, "handleMessage");
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter
) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 添加消息监听器和监听的频道
// 基于频道的发布/订阅:
container.addMessageListener(messageListenerAdapter, new ChannelTopic("your-channel"));
// 基于模式的发布/订阅:
container.addMessageListener(messageListenerAdapter, new PatternTopic("your-pattern-*"));
return container;
}
/**
* 基于MessageListener的配置
* 直接使用RedisMessageSubscriber
* @param connectionFactory
* @param redisMessageSubscriber
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory, RedisMessageSubscriber redisMessageSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 在这里设置你的动态频道名称,可以从配置文件或其他地方获取
String dynamicChannel = "your-channel";
// 基于频道的发布/订阅:
ChannelTopic channelTopic = new ChannelTopic(dynamicChannel);
// 基于模式的发布/订阅:
// container.addMessageListener(redisMessageSubscriber, new PatternTopic("your-pattern-*"));
// 添加消息监听器和监听的动态频道
container.addMessageListener(redisMessageSubscriber, channelTopic);
return container;
}
// /**
// * 多频道示例
// * @param connectionFactory
// * @param redisMessageSubscriber
// * @return
// */
// @Bean
// public RedisMessageListenerContainer redisMessageListenerContainerMoreTopic(
// RedisConnectionFactory connectionFactory,
// RedisMessageSubscriber redisMessageSubscriber
// ) {
// RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// container.setConnectionFactory(connectionFactory);
//
// // 在这里设置你的动态频道名称列表,可以从配置文件或其他地方获取
// List<String> dynamicChannels = Arrays.asList("channel1", "channel2", "channel3");
//
// // 创建包含所有频道的ChannelTopic列表
// List<ChannelTopic> channelTopics = createChannelTopics(dynamicChannels);
//
// // 添加消息监听器和监听的多个频道
// for (ChannelTopic channelTopic : channelTopics) {
// container.addMessageListener(redisMessageSubscriber, channelTopic);
// }
//
// return container;
// }
//
// private List<ChannelTopic> createChannelTopics(List<String> channelNames) {
// // 使用动态频道名称创建ChannelTopic列表
// List<ChannelTopic> channelTopics = new ArrayList<>();
// for (String channelName : channelNames) {
// channelTopics.add(new ChannelTopic(channelName));
// }
// return channelTopics;
// }
}
这段代码是用于配置并创建 RedisMessageListenerContainer
的方法。RedisMessageListenerContainer
是 Spring Data Redis 提供的一个用于监听 Redis 消息的容器。以下是对代码的详细解释:
方法签名
RedisConnectionFactory connectionFactory
:这是用于创建 Redis 连接的工厂。RedisMessageSubscriber redisMessageSubscriber
:这是一个 Redis 消息订阅者,用于处理接收到的消息。
创建 RedisMessageListenerContainer 实例
代码语言:javascript复制RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
- 创建一个新的
RedisMessageListenerContainer
实例。 - 将
connectionFactory
设置为容器的连接工厂,用于创建连接到 Redis 的连接。
设置动态频道名称
代码语言:javascript复制String dynamicChannel = "your-channel";
- 定义一个动态频道名称,可以从配置文件或其他地方获取。
创建 ChannelTopic 对象
代码语言:javascript复制ChannelTopic channelTopic = new ChannelTopic(dynamicChannel);
- 创建一个
ChannelTopic
对象,表示基于频道的发布/订阅,其中dynamicChannel
是频道名称。
添加消息监听器和频道
代码语言:javascript复制container.addMessageListener(redisMessageSubscriber, channelTopic);
- 将
redisMessageSubscriber
添加为消息监听器,用于处理接收到的消息。 - 指定要监听的频道,这里使用了基于频道的发布/订阅模式。
返回 RedisMessageListenerContainer 实例
代码语言:javascript复制return container;
- 返回配置好的
RedisMessageListenerContainer
实例。
通过以上步骤,这段代码的目的是创建一个配置好的 RedisMessageListenerContainer
,该容器已设置好连接工厂、消息监听器以及要监听的动态频道。当 Redis 中的指定频道发布消息时,redisMessageSubscriber
的 onMessage
方法将被调用来处理消息。这是一种基于频道的发布/订阅模式,允许应用程序实时地接收并处理消息。
自定义监听的回调函数
创建消息接收者
不需要实现MessageListener接口
代码语言:javascript复制package com.example.demo.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 自定义的监听类
*/
@Slf4j
@Component
public class RedisMessageCustomSubscriber {
@Autowired
private MessageProcessor messageProcessor;
/**
* 自定义的回调函数
* @param message
* @param channel
*/
public void handleMessage(String message, String channel) {
log.info("@@ 当前执行的方法:handleMessage");
messageProcessor.processMessage(channel, message);
}
}
RedisConfig配置
代码语言:javascript复制package com.example.demo.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Slf4j
@Configuration
public class RedisConfig {
/**
* 配置订阅
* 基于MessageListenerAdapter和RedisMessageListenerContainer
* @param redisMessageSubscriber
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {
return new MessageListenerAdapter(redisMessageSubscriber, "handleMessage");
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter
) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 添加消息监听器和监听的频道
// 基于频道的发布/订阅:
container.addMessageListener(messageListenerAdapter, new ChannelTopic("your-channel"));
// 基于模式的发布/订阅:
container.addMessageListener(messageListenerAdapter, new PatternTopic("your-pattern-*"));
return container;
}
// /**
// * 基于MessageListener的配置
// * 直接使用RedisMessageSubscriber
// * @param connectionFactory
// * @param redisMessageSubscriber
// * @return
// */
// @Bean
// public RedisMessageListenerContainer redisMessageListenerContainer(
// RedisConnectionFactory connectionFactory, RedisMessageSubscriber redisMessageSubscriber) {
//
// RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// container.setConnectionFactory(connectionFactory);
//
// // 在这里设置你的动态频道名称,可以从配置文件或其他地方获取
// String dynamicChannel = "your-channel";
//
// // 基于频道的发布/订阅:
// ChannelTopic channelTopic = new ChannelTopic(dynamicChannel);
//
// // 基于模式的发布/订阅:
// // container.addMessageListener(redisMessageSubscriber, new PatternTopic("your-pattern-*"));
//
// // 添加消息监听器和监听的动态频道
// container.addMessageListener(redisMessageSubscriber, channelTopic);
//
// return container;
// }
// /**
// * 多频道示例
// * @param connectionFactory
// * @param redisMessageSubscriber
// * @return
// */
// @Bean
// public RedisMessageListenerContainer redisMessageListenerContainerMoreTopic(
// RedisConnectionFactory connectionFactory,
// RedisMessageSubscriber redisMessageSubscriber
// ) {
// RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// container.setConnectionFactory(connectionFactory);
//
// // 在这里设置你的动态频道名称列表,可以从配置文件或其他地方获取
// List<String> dynamicChannels = Arrays.asList("channel1", "channel2", "channel3");
//
// // 创建包含所有频道的ChannelTopic列表
// List<ChannelTopic> channelTopics = createChannelTopics(dynamicChannels);
//
// // 添加消息监听器和监听的多个频道
// for (ChannelTopic channelTopic : channelTopics) {
// container.addMessageListener(redisMessageSubscriber, channelTopic);
// }
//
// return container;
// }
//
// private List<ChannelTopic> createChannelTopics(List<String> channelNames) {
// // 使用动态频道名称创建ChannelTopic列表
// List<ChannelTopic> channelTopics = new ArrayList<>();
// for (String channelName : channelNames) {
// channelTopics.add(new ChannelTopic(channelName));
// }
// return channelTopics;
// }
}
这段代码配置了两个 @Bean
方法,一个用于创建 MessageListenerAdapter
实例,另一个用于创建 RedisMessageListenerContainer
实例。以下是详细解释:
创建 MessageListenerAdapter 实例
代码语言:javascript复制@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {
return new MessageListenerAdapter(redisMessageSubscriber, "handleMessage");
}
- 通过
@Bean
注解创建一个MessageListenerAdapter
实例。 - 将
RedisMessageSubscriber
对象传递给构造函数,表示这个适配器将调用RedisMessageSubscriber
的方法来处理消息。 - 第二个参数
"handleMessage"
表示要调用的消息处理方法的名称。
创建 RedisMessageListenerContainer 实例
代码语言:javascript复制@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter
) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
- 通过
@Bean
注解创建一个RedisMessageListenerContainer
实例。 - 将
RedisConnectionFactory
传递给构造函数,表示这个容器将使用的连接工厂。
添加消息监听器和频道
代码语言:javascript复制// 添加消息监听器和监听的频道
// 基于频道的发布/订阅:
container.addMessageListener(messageListenerAdapter, new ChannelTopic("your-channel"));
// 基于模式的发布/订阅:
container.addMessageListener(messageListenerAdapter, new PatternTopic("your-pattern-*"));
- 使用
container.addMessageListener
方法添加消息监听器(messageListenerAdapter
)。 - 第一个参数是消息监听器适配器,它会调用
RedisMessageSubscriber
的handleMessage
方法来处理消息。 - 第二个参数是
ChannelTopic
对象,表示基于频道的发布/订阅模式,监听指定的频道。 - 第三个参数是
PatternTopic
对象,表示基于模式的发布/订阅模式,监听指定模式的频道。
返回 RedisMessageListenerContainer 实例
代码语言:javascript复制return container;
- 返回配置好的
RedisMessageListenerContainer
实例。
通过这样的配置,RedisMessageListenerContainer
已经设置好了连接工厂和消息监听器,并分别基于频道和基于模式的发布/订阅模式来监听相应的消息。当 Redis 中的指定频道发布消息时,handleMessage
方法将被调用来处理消息。
区别
MessageListenerAdapter
和 RedisMessageListenerContainer
是 Spring Data Redis 提供的两个重要组件,用于实现 Redis 消息监听的机制。
MessageListenerAdapter
MessageListenerAdapter
是一个适配器,用于将普通的 Java 对象(POJO)转换为 Redis 消息监听器。它通过反射调用目标对象的方法来处理接收到的消息。在你的 POJO 类中,你可以定义一个或多个方法来处理不同类型的消息。
主要特点和用法:
- 将普通的 Java 对象转换为 Redis 的消息监听器。
- 可以指定调用目标对象的特定方法来处理消息。
- 通过设置消息转换器,可以支持多种消息格式,如 JSON、XML 等。
- 提供了一种简化消息处理逻辑的方式,避免了直接实现
MessageListener
接口的繁琐性。
RedisMessageListenerContainer
RedisMessageListenerContainer
是 Spring 提供的容器,用于管理 Redis 消息的监听器。它可以注册一个或多个消息监听器,并在接收到消息时调用相应的处理方法。该容器还负责管理连接到 Redis 的连接工厂,以及监听的频道或模式。
主要特点和用法:
- 管理 Redis 连接工厂,确保连接的创建和关闭。
- 注册消息监听器,并在接收到消息时调用相应的处理方法。
- 支持基于频道和基于模式的发布/订阅模式。
- 提供了灵活的配置选项,如消息转换器、错误处理器等。
总体而言,MessageListenerAdapter
和 RedisMessageListenerContainer
是一对重要的组件,它们使得在 Spring 应用中实现 Redis 消息监听变得更加简单和灵活。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!