普通redis订阅,是以用container做容器,配置类配置文件方式直接在spring init的时候进行加载,不能进行动态添加。在程序运行时修改不起作用。
代码语言:javascript复制@Configuration
public class RedisChannelConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅主题messagepush和messagepush3
container.addMessageListener(listenerAdapter, new PatternTopic("messagepush"));
container.addMessageListener(listenerAdapter, new PatternTopic("messagepush3"));
//这个container 可以添加多个 messageListener
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessageReceive receiver) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean //注入操作数据的template(这里不需要操作redis数据,和消息队列功能无关)
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
//此段代码摘自: https://blog.csdn.net/zhang18024666607/article/details/84392335
如果只是当做固定的消息队列进行订阅发布,足够,但是如果需求是根据前台传入的字段,动态的订阅的话就无法满足了,想要实现就不能用 container 的方式进行订阅,但是可以利用Lettuce客户端进行订阅,旧版本中的spring-data-redis中的自带客户端都是jedis,新版本后都换成了Lettuce,还自带了异步方法,不会对系统阻塞。
代码语言:javascript复制package com.miracle.im.service;
import com.miracle.im.pojo.Msg;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Author Diuut
* @Date 2020/4/13 9:50
*/
@Slf4j
@Service
public class ImService {
@Autowired
private MyRedisPubSubListener myRedisPubSubListener;
public String publish(String consumer, String msg) {
RedisURI redisUri = RedisURI.Builder.redis("xxxxxx")
.withPassword("xxxxxx")
.withDatabase(2)
.withPort(6379)
.build();
// RedisURI redisUri = RedisURI.Builder.redis("127.0.0.1").build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Long> publish = async.publish(consumer, msg);
return "执行完毕";
}
public String subscribe(String username) {
RedisURI redisUri = RedisURI.Builder.redis("xxxxxx")
.withPassword("xxxxxx")
.withDatabase(2)
.withPort(6379)
.build();
// RedisURI redisUri = RedisURI.Builder.redis("127.0.0.1").build();
RedisClient client = RedisClient.create(redisUri);
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(myRedisPubSubListener);
RedisPubSubAsyncCommands<String, String> async = connection.async();
async.subscribe("Topic_" username);
async.subscribe("Topic_server");
// log.info("future: {}",future);
return "执行完毕";
}
}
代码语言:javascript复制package com.miracle.im.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import io.lettuce.core.pubsub.RedisPubSubListener;
/**
* Simple to Introduction
* className: MyRedisPubSubListener
*
*/
@Slf4j
@Component
public class MyRedisPubSubListener implements RedisPubSubListener<String, String> {
@Override
public void message(String channel, String message) {
log.info("msg1={} on channel {}", message, channel);
}
@Override
public void message(String pattern, String channel, String message) {
log.info("msg2={} in channel={}", message, channel);
}
@Override
public void subscribed(String channel, long count) {
log.info("sub channel={}, count={}", channel, count);
}
@Override
public void psubscribed(String pattern, long count) {
log.info("psub pattern={}, count={}", pattern, count);
}
@Override
public void unsubscribed(String channel, long count) {
log.info("unsub channel={}, count={}", channel, count);
}
@Override
public void punsubscribed(String pattern, long count) {
log.info("punsub channel={}, count={}", pattern, count);
}
}
Post Views: 220