消息转发
kafka 消费者可以将消费到的消息转发到指定的主题中去,比如一条消息需要经过多次流转加工才能走完整个业务流程,需要多个consumer来配合完成。转发代码示例如下:
代码语言:javascript复制
@KafkaListener(topics = "send-a")
@SendTo("send-b")
public String sendTest0(ConsumerRecord<?, String> record){
System.out.println(record.value());
return "转发消息" record.value();
}
@KafkaListener(topics = "send-b")
public void sendTest1(ConsumerRecord<?, String> record){
System.out.println(record.value());
}
@Scheduled(cron = "*/15 * * * * ?")
@Transactional
public void producerTest(){
kafkaTemplate.send("send-a","xxxxxxxxxxxxxx");
}
生产者获取消费者响应
结合 @sendTo注解
和 ReplyingKafkaTemplate
类 生产者可以获取消费者消费消息的结果; 因为 ReplyingKafkaTemplate 是kafkaTemplate 的一个子类,当你往spring 容器注册 这个bean, kafkaTemplate 的自动装配就会关闭,但是kafkaTemplate 是必须的,因此你需要把这两个bean 都手动注册上。配置示例:
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic2() {
return new NewTopic("topic-kl", 1, (short) 1);
}
@Bean
public AdminClient init( KafkaProperties kafkaProperties){
return KafkaAdminClient.create(kafkaProperties.buildAdminProperties());
}
/**
* 同步的kafka需要ReplyingKafkaTemplate,指定repliesContainer
*
* @param producerFactory
* @param repliesContainer
* @return
*/
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> producerFactory,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
//同步相应超时时间:10s
template.setReplyTimeout(10000);
return template;
}
@Bean
public ProducerFactory<String,String> producerFactory(KafkaProperties properties) {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());
producerFactory.setTransactionIdPrefix(properties.getProducer().getTransactionIdPrefix());
return producerFactory;
// return new DefaultKafkaProducerFactory<>(properties.producerConfigs(properties));
}
public Map<String, Object> producerConfigs(KafkaProperties properties) {
Map<String, Object> props = new HashMap<>();
//用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。 格式:host1:port1,host2:port2,…,
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,String.join(",",properties.getBootstrapServers()));
// 重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// Producer可以将发往同一个Partition的数据做成一个Produce Request发送请求以减少请求次数,该值即为每次批处理的大小,若将该值设为0,则不会进行批处理
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// Producer可以用来缓存数据的内存大小。该值实际为RecordAccumulator类中的BufferPool,即Producer所管理的最大内存。
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//发送一次message最大大小,默认是1M
//props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 20971520);
// 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* 指定consumer返回数据到指定的topic
* @return
*/
@Bean
public ConcurrentMessageListenerContainer<String, String>
repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("topic-return");
repliesContainer.setAutoStartup(true);
return repliesContainer;
}
@Bean
// @ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, String> kafkaProducerFactory,
ObjectProvider<RecordMessageConverter> messageConverter,KafkaProperties properties) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener( new LoggingProducerListener<>());
kafkaTemplate.setDefaultTopic(properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
生产者接收消费者返回值(这俩最好不要开到一个应用中,否则会很容易生产者超时,观察不到返回的结果):
代码语言:javascript复制
@Scheduled(cron = "*/1 * * * * ?")
@Transactional
public void returnTestProducer(){
ProducerRecord<String, String> record = new ProducerRecord<>("topic-return", "test-return");
RequestReplyFuture<String, String, String> replyFuture = replyingTemplate.sendAndReceive(record);
try {
String value = replyFuture.get().value();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@KafkaListener(topics = "topic-return")
@SendTo
public String listen(String message) {
return "consumer return:".concat(message);
}
结语
kafka 的相关知识更新完了,这是最后一节。内容比较粗糙,没有涉及到一些业务场景的设计使用,但是作为入门教程还是很不错的,感谢阅读。