kafka 结合springboot实战
这一章节我们开始进入实战环节,废话不多说,让我们开始吧。
依赖和配置
我们新建一个springboot 项目,在 pom中引入依赖:
代码语言:javascript复制<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后启动项添加注解 @EnableScheduling
,@EnableKafka
。第一个注解是用来添加springboot定时任务以方便测试,第二个注解是装配kafka 配置。
接下来我们要在 application 的配置文件:
代码语言:javascript复制# 生产者配置
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#消费监听接口监听的主题不存在时,默认会报错
spring.kafka.listener.missing-topics-fatal=false
注册一个 AdminClient
:
@Bean
public AdminClient init( KafkaProperties kafkaProperties){
return KafkaAdminClient.create(kafkaProperties.buildAdminProperties());
}
这里因为是demo,我就将生产者和消费者写在一个程序里面了。
消息的生产和消费
先测试一个简单的收发消息:
代码语言:javascript复制@RestController
public class TestController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private AdminClient adminClient;
@Scheduled(cron = "*/15 * * * * ?")
public void send() {
kafkaTemplate.send("xxxxx", "test");
}
@KafkaListener(topics = "xxxxx",groupId = "test-consumer-group")
public void listen(ConsumerRecord<?, String> record) throws ExecutionException, InterruptedException {
String value = record.value();
System.out.println(value);
}
}
这里我调用了kafkaTemplate.send
方法发送消息,第一个参数是消息的主题,第二个参数是消息. 这里我并没有先创建主题,直接往主题里面发消息了,框架会给你直接创建一个默认的主题.
主题的创建和查看
我们可以通过 NewTopic
以代码的方式创建一个主题:
@Bean
public NewTopic topic() {
return new NewTopic("topic-test", 1, (short) 1);
}
当然像 rabbitMQ 的api 那样,spring boot 还非常贴心的准备了 topic 建造者类:
代码语言:javascript复制@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
还可以通过 AdminClient 创建主题:
代码语言:javascript复制 @Autowired
private AdminClient adminClient;
public String createTopic(){
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic("topic-a",1,(short) 1));
adminClient.createTopics(newTopics);
System.out.println("》》》》》》》》》》》》》》》 创建topic");
ListTopicsResult listTopicsResult = adminClient.listTopics();
System.out.println(">>>>>>>>>>>>>>>>>>>获取列表");
return "success";
}
第一个参数是主题名称,第二个参数是分区数,第三个分区是副本数(包括leader). 我们可以通过 AdminClient
查看 主题信息:
public String getTopic() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
System.out.println(">>>>>>>>>>>>>>>>>>>获取列表");
return "success";
}
ListTopicsResult
的方法返回值都是 Future
类型的,这意味着它是异步的,使用的时候需要注意这一点.
和rabbitMQ 类似,kafka 给我们准备了一个默认主题:
代码语言:javascript复制 @Scheduled(cron = "*/15 * * * * ?")
public void sendDefault() {
kafkaTemplate.sendDefault("xxx");
}
这条消息会被发送到名为 topic.quick.default
的主题当中去.
kafkaTemplate.send
方法的使用
我们要注意 kafkaTemplate.send
它的返回值是ListenableFuture
,从名字我们就能知道它实际上是一个异步的方法, 我们可以通过 ListenableFuture.addCallback
方法去指定回调函数:
@Scheduled(cron = "*/15 * * * * ?")
public void send() {
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("xxxxx", "test");
send.addCallback(new ListenableFutureCallback(){
@Override
public void onSuccess(Object o) {
}
@Override
public void onFailure(Throwable throwable) {
}
});
}
我们也可以通过 ListenableFuture.get
方法让它阻塞:
// @Scheduled(cron = "*/15 * * * * ?")
public void send1() {
try {
kafkaTemplate.send("xxxxx", "test").get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
kafka 相关基本的api就介绍到这里了,源码可以上 https://github.com/muggle0/learn-simple
去找.
未完待续...