生产者事务
Spring-kafka自动注册的KafkaTemplate实例是不具有事务消息发送能力的。需要在 application.properties
配置属性:
spring.kafka.producer.acks=-1
spring.kafka.producer.transaction-id-prefix=kafka_tx
当激活事务时 kafkaTemplate 就只能发送事务消息了,发送非事务的消息会报异常。发送事务消息的方法有两种,一种是通过 kafkaTemplate.executeInTransaction 实现,一种是通过 spring的注解 @Transactional
来实现,代码示例:
@Scheduled(cron = "*/15 * * * * ?")
public void sendTrans() {
kafkaTemplate.executeInTransaction(t ->{
t.send("xxxxx","test1");
t.send("xxxxx","test2");
return true;
}
);
}
@Scheduled(cron = "*/15 * * * * ?")
@Transactional(rollbackFor = Exception.class)
public void sendFoo() {
kafkaTemplate.send("topic_input", "test");
}
消费者Ack
消费者消息消息可以自动确认,也可以通过手动确认,开启手动首先需要关闭自动提交,然后设置下consumer的消费模式:
代码语言:javascript复制spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
配置完成之后我们需要对消费者监听器做一点小改动:
代码语言:javascript复制 @KafkaListener( topics = "topic_input")
public void listen(ConsumerRecord<?, String> record, Acknowledgment ack) {
System.out.println(record.value());
ack.acknowledge();
}
如你所见,我们可以通过 Acknowledgment.acknowledge()
来手动的确认消息的消费,不确认就不算消费成功,监听器会再次收到这个消息。对于某些业务场景这个功能还是很必要的,比如消费消息的同时导致写库异常,数据库回滚,那么消息也不应该被ack。
消费者监听器生命周期控制
消费者监听器有三个生命周期:启动、停止、继续;如果我们想控制消费者监听器生命周期,需要修改@KafkaListener
的 autoStartup
属性为false, 并给监听器 id 属性赋值 然后通过KafkaListenerEndpointRegistry
控制id 对应的监听器的启动停止继续:
import org.springframework.stereotype.Service;
@Service
public class test {
@Autowired
KafkaListenerEndpointRegistry listenerRegistry;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(cron = "*/15 * * * * ?")
@Transactional
public void testListener(){
if (i==20){
listenerRegistry.getListenerContainer("listener1").start();
}
System.out.println("生产者生产消息" i );
kafkaTemplate.send("test","xxx" i);
}
@KafkaListener( id = "listener1",topics = "test",autoStartup ="false" )
public void testStart(ConsumerRecord<?, String> record){
System.out.println(record.value());
}
}
通过观察窗口输出就能看到,生产者生产了20条数据后消费者监听器才开始启动消费。