上篇我写了一个通用的消息队列(redis,kafka,rabbitmq)--生产者篇,这次写一个消费者篇.
1.消费者的通用调用类:
代码语言:javascript复制/**
* 消息队列处理的handle
* @author starmark
* @date 2020/5/1 上午10:56
*/
public interface IMessageQueueConsumerService {
/**
* 处理消息队列的消息
* @param message 消息
*/
void receiveMessage(String message);
/**
* 返回监听的topic
* @return 主题
*/
String topic();
/**
*
* @param consumerType 消费者类型
* @return 是否支持该消费者类者
*/
boolean support(String consumerType);
}
只要实现该类的接口就可以实现监听,
redis的消费端,有两个类,如下:
代码语言:javascript复制/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueRedisConsumerListener implements MessageListener {
private IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message, byte[] pattern) {
messageQueueConsumerService.receiveMessage(message.toString());
}
}
/**
* 消息队列服务端的监听
*
* @author starmark
* @date 2020/5/1 上午10:55
*/
@Service
public class MessageQueueRedisConsumerServiceFactory {
private List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("redis")).collect(Collectors.toList());
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
new MessageQueueRedisConsumerListener(messageQueueConsumerService));
messageListenerAdapter.afterPropertiesSet();
container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
});
return container;
}
}
kafka消费者也有两个类,如下:
代码语言:javascript复制/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {
private final IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
messageQueueConsumerService.receiveMessage(data.value());
}
}
/**
* 消息队列服务端的监听
*
* @author starmark
* @date 2020/5/1 上午10:55
*/
@Component
public class MessageQueueKafkaConsumerServiceFactory implements InitializingBean {
@Autowired
KafkaProperties kafkaProperties;
private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
}
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(props);
return new KafkaMessageListenerContainer<>(cf, containerProps);
}
@Override
public void afterPropertiesSet() {
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());
containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
);
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName(messageQueueConsumerService.topic() "kafkaListener");
container.start();
});
}
}
这些类都是实现动态监听某个主题.
rabbitmq就有点复杂,因为他要求建了queue才能实现监听,我现在这个代码,如果生产者没有创建队列,会自动帮生产者创建该主题的队列。其实这是不对的,但不这么做,无法实现监听.
代码语言:javascript复制/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueRabbitmqConsumerListener implements MessageListener {
private final IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message) {
messageQueueConsumerService.receiveMessage(new String(message.getBody()));
}
}
@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
private final ConfigurableApplicationContext applicationContext;
private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
private final ConnectionFactory connectionFactory;
@Autowired
public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
this.applicationContext = applicationContext;
this.connectionFactory = connectionFactory;
}
@Override
public void afterPropertiesSet() {
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setConsumerStartTimeout(6000L);
;
//设置监听的队列名,
String[] types = {messageQueueConsumerService.topic()};
container.setQueueNames(types);
container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
container.start();
});
}
private void registerBean(String name, Object... args) {
if (applicationContext.containsBean(name)) {
return;
}
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
if (args.length > 0) {
for (Object arg : args) {
beanDefinitionBuilder.addConstructorArgValue(arg);
}
}
BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(name, beanDefinition);
}
}
至此,通用的消息队列已完成,这个只能满足一般情况的使用 .
如果要更高端的使用,直接使用其原生的api会更好.