通用的消息队列(redis,kafka,rabbitmq)--消费者篇

2020-06-19 16:57:56 浏览数 (1)

上篇我写了一个通用的消息队列(redis,kafka,rabbitmq)--生产者篇,这次写一个消费者篇.

1.消费者的通用调用类:

代码语言:javascript复制
/**
 * 消息队列处理的handle
 * @author starmark
 * @date 2020/5/1  上午10:56
 */
public interface IMessageQueueConsumerService {


    /**
     * 处理消息队列的消息
     * @param message 消息
     */
    void receiveMessage(String message);

    /**
     * 返回监听的topic
     * @return 主题
     */
    String topic();

    /**
     *
     * @param consumerType 消费者类型
     * @return 是否支持该消费者类者
     */
    boolean support(String consumerType);
}

只要实现该类的接口就可以实现监听,

redis的消费端,有两个类,如下:

代码语言:javascript复制
/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRedisConsumerListener implements MessageListener {

    private IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        messageQueueConsumerService.receiveMessage(message.toString());
    }
}

/**
 * 消息队列服务端的监听
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Service
public class MessageQueueRedisConsumerServiceFactory {


    private List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("redis")).collect(Collectors.toList());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                    new MessageQueueRedisConsumerListener(messageQueueConsumerService));
            messageListenerAdapter.afterPropertiesSet();
            container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));

        });

        return container;
    }


}

kafka消费者也有两个类,如下:

代码语言:javascript复制
/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        messageQueueConsumerService.receiveMessage(data.value());
    }
}

/**
 * 消息队列服务端的监听
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Component
public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {

    @Autowired
    KafkaProperties kafkaProperties;

    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
    }




    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        return new KafkaMessageListenerContainer<>(cf, containerProps);
    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());

            containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
            );
            KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
            container.setBeanName(messageQueueConsumerService.topic()   "kafkaListener");

            container.start();

        });

    }
}

这些类都是实现动态监听某个主题.

rabbitmq就有点复杂,因为他要求建了queue才能实现监听,我现在这个代码,如果生产者没有创建队列,会自动帮生产者创建该主题的队列。其实这是不对的,但不这么做,无法实现监听.

代码语言:javascript复制
/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRabbitmqConsumerListener implements MessageListener  {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message) {

        messageQueueConsumerService.receiveMessage(new String(message.getBody()));
    }

}

@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final ConfigurableApplicationContext applicationContext;
    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
    private final ConnectionFactory connectionFactory;

    @Autowired
    public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
        this.applicationContext = applicationContext;
        this.connectionFactory = connectionFactory;

    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {

            this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setConsumerStartTimeout(6000L);
        ;
            //设置监听的队列名,
            String[] types = {messageQueueConsumerService.topic()};
            container.setQueueNames(types);
            container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
            container.start();
        });

    }


    private void registerBean(String name, Object... args) {
        if (applicationContext.containsBean(name)) {
            return;
        }
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
        if (args.length > 0) {
            for (Object arg : args) {
                beanDefinitionBuilder.addConstructorArgValue(arg);
            }
        }
        BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();

        BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
        beanFactory.registerBeanDefinition(name, beanDefinition);

    }
}

至此,通用的消息队列已完成,这个只能满足一般情况的使用 .

如果要更高端的使用,直接使用其原生的api会更好.

0 人点赞