redis是一款高性能key-value存储系统,不仅能做缓存,还能用于消息队列
这里利用Spring Data Redis 来实现消息的发布订阅机制 Demo地址:GitHub - jujunchen/redis-queue-demo: redis 实现的消息 发布/订阅机制
一共3个应用,1个发布者应用,2个订阅者应用
发布者应用
RedisConfig redis序列化配置
Person 示例传输的POJO对象
Publisher 发布服务
代码语言:javascript复制@Component
public class Publisher {
@Autowired
@Qualifier(value = "customRedisTemplate")
private RedisTemplate redisTemplate;
/**
* 向指定频道发布消息
* @param channel 频道名称
* @param object 消息
*/
public void publish(String channel,Object object){
redisTemplate.convertAndSend(channel,object);
}
}
一个发布测试类
代码语言:javascript复制public class DemoApplicationTests {
@Autowired
private Publisher publisher;
@Test
public void startPublisher() {
System.out.println("发布消息");
//Person person = new Person("redis","10","x");
publisher.publish("testChannel","渠道1消息");
publisher.publish("testChannel2","渠道2消息");
}
}
订阅者应用
MessageConfig 接收消息配置
RedisConfig redis序列化配置,与发布服务相同
Subscriber 订阅服务
MessageConfig接收消息配置
代码语言:javascript复制@Configuration
public class MessageConfig {
@Autowired
private Subscriber subscriber;
@Autowired
@Qualifier(value = "customRedisTemplate")
private RedisTemplate redisTemplate;
/**
* RedisMessageListenerContainer充当消息侦听器容器。
* 它用于从Redis通道接收消息并驱动注入其中的MessageListener实例。
* 侦听器容器负责消息接收的所有线程并将其分派到侦听器进行处理。
* 消息监听器容器是MDP和消息传递提供者之间的中介,并负责注册以接收消息,资源获取和释放,异常转换等。
*
* 此外,为了最小化应用程序占用空间,RedisMessageListenerContainer允许多个侦听器共享一个连接和一个线程,即使它们不共享订阅。
* 因此,无论应用程序跟踪多少个侦听器或通道,运行时成本在其整个生命周期内保持不变。
* 此外,容器允许更改运行时配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。
* 此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。
* 如果所有侦听器都已取消订阅,则会自动执行清理,并释放该线程。
* 为了帮助消息的异步性,容器需要一个java.util.concurrent.Executor(或Spring的TaskExecutor)来分派消息。
* 根据负载,侦听器数量或运行时环境,您应该更改或调整执行程序以更好地满足您的需求。 强烈建议选择适当的TaskExecutor来利用其运行时。
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisTemplate.getConnectionFactory());
List<Topic> topicList = new ArrayList<>();
topicList.add(new PatternTopic("testChannel"));
container.addMessageListener(listenerAdapter, topicList);
return container;
}
/**
* 消息侦听器适配器,能将消息委托给目标侦听器方法
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(subscriber);
}
}
Subscriber 订阅服务
代码语言:javascript复制@Component
public class Subscriber implements MessageListener{
@Autowired
@Qualifier(value = "customRedisTemplate")
private RedisTemplate redisTemplate;
/**
* 每次新消息到达时,都会调用回调
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<?> keySerializer = redisTemplate.getKeySerializer();
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
Object channel = keySerializer.deserialize(message.getChannel());
Object body = valueSerializer.deserialize(message.getBody());
System.out.println("渠道: " channel);
System.out.println("消息内容: " String.valueOf(body));
}
}
当我跑下发布服务测试用例的时候,两个订阅者分别会收到来自订阅渠道的消息