kafka 结合springboot实战--第三节

2022-12-23 20:44:38 浏览数 (1)

消息转发

kafka 消费者可以将消费到的消息转发到指定的主题中去,比如一条消息需要经过多次流转加工才能走完整个业务流程,需要多个consumer来配合完成。转发代码示例如下:

代码语言:javascript复制

    @KafkaListener(topics = "send-a")
    @SendTo("send-b")
    public String sendTest0(ConsumerRecord<?, String> record){
        System.out.println(record.value());
        return "转发消息" record.value();
    }

    @KafkaListener(topics = "send-b")
    public void sendTest1(ConsumerRecord<?, String> record){
        System.out.println(record.value());
    }

    @Scheduled(cron = "*/15 * * * * ?")
    @Transactional
    public void producerTest(){
        kafkaTemplate.send("send-a","xxxxxxxxxxxxxx");
    }

生产者获取消费者响应

结合 @sendTo注解ReplyingKafkaTemplate 类 生产者可以获取消费者消费消息的结果; 因为 ReplyingKafkaTemplate 是kafkaTemplate 的一个子类,当你往spring 容器注册 这个bean, kafkaTemplate 的自动装配就会关闭,但是kafkaTemplate 是必须的,因此你需要把这两个bean 都手动注册上。配置示例:

代码语言:javascript复制

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic-kl", 1, (short) 1);
    }



    @Bean
    public AdminClient init( KafkaProperties kafkaProperties){
        return KafkaAdminClient.create(kafkaProperties.buildAdminProperties());
    }

    /**
     * 同步的kafka需要ReplyingKafkaTemplate,指定repliesContainer
     *
     * @param producerFactory
     * @param repliesContainer
     * @return
     */
    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
        ProducerFactory<String, String> producerFactory,
        ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
        //同步相应超时时间:10s
        template.setReplyTimeout(10000);
        return template;
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(KafkaProperties properties) {
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());
        producerFactory.setTransactionIdPrefix(properties.getProducer().getTransactionIdPrefix());
        return  producerFactory;
//        return new DefaultKafkaProducerFactory<>(properties.producerConfigs(properties));
    }

    public Map<String, Object> producerConfigs(KafkaProperties properties) {
        Map<String, Object> props = new HashMap<>();
        //用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。 格式:host1:port1,host2:port2,…,
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,String.join(",",properties.getBootstrapServers()));
        // 重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // Producer可以将发往同一个Partition的数据做成一个Produce Request发送请求以减少请求次数,该值即为每次批处理的大小,若将该值设为0,则不会进行批处理
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // Producer可以用来缓存数据的内存大小。该值实际为RecordAccumulator类中的BufferPool,即Producer所管理的最大内存。
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //发送一次message最大大小,默认是1M
        //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 20971520);
        // 序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * 指定consumer返回数据到指定的topic
     * @return
     */
    @Bean
    public ConcurrentMessageListenerContainer<String, String>
    repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer =
            containerFactory.createContainer("topic-return");
        repliesContainer.setAutoStartup(true);
        return repliesContainer;
    }

    @Bean
//    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, String> kafkaProducerFactory,
                                             ObjectProvider<RecordMessageConverter> messageConverter,KafkaProperties properties) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener( new LoggingProducerListener<>());
        kafkaTemplate.setDefaultTopic(properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


}

生产者接收消费者返回值(这俩最好不要开到一个应用中,否则会很容易生产者超时,观察不到返回的结果):

代码语言:javascript复制

    @Scheduled(cron = "*/1 * * * * ?")
    @Transactional
    public void returnTestProducer(){
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-return", "test-return");
        RequestReplyFuture<String, String, String> replyFuture = replyingTemplate.sendAndReceive(record);
        try {
            String value = replyFuture.get().value();
            System.out.println(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    @KafkaListener(topics = "topic-return")
    @SendTo
    public String listen(String message) {
        return "consumer return:".concat(message);
    }

结语

kafka 的相关知识更新完了,这是最后一节。内容比较粗糙,没有涉及到一些业务场景的设计使用,但是作为入门教程还是很不错的,感谢阅读。

0 人点赞