kafka 结合springboot实战--第一节

2022-12-23 20:44:06 浏览数 (1)

kafka 结合springboot实战

这一章节我们开始进入实战环节,废话不多说,让我们开始吧。

依赖和配置

我们新建一个springboot 项目,在 pom中引入依赖:

代码语言:javascript复制
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

然后启动项添加注解 @EnableScheduling@EnableKafka 。第一个注解是用来添加springboot定时任务以方便测试,第二个注解是装配kafka 配置。

接下来我们要在 application 的配置文件:

代码语言:javascript复制
# 生产者配置
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#消费监听接口监听的主题不存在时,默认会报错
spring.kafka.listener.missing-topics-fatal=false

注册一个 AdminClient :

代码语言:javascript复制
    @Bean
    public AdminClient init( KafkaProperties kafkaProperties){
        return KafkaAdminClient.create(kafkaProperties.buildAdminProperties());
    }

这里因为是demo,我就将生产者和消费者写在一个程序里面了。

消息的生产和消费

先测试一个简单的收发消息:

代码语言:javascript复制
@RestController
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private   AdminClient adminClient;

    @Scheduled(cron = "*/15 * * * * ?")
    public void send() {
        kafkaTemplate.send("xxxxx", "test");
    }

    @KafkaListener(topics = "xxxxx",groupId = "test-consumer-group")
    public void listen(ConsumerRecord<?, String> record) throws ExecutionException, InterruptedException {
        String value = record.value();
        System.out.println(value);
    }
}

这里我调用了kafkaTemplate.send 方法发送消息,第一个参数是消息的主题,第二个参数是消息. 这里我并没有先创建主题,直接往主题里面发消息了,框架会给你直接创建一个默认的主题.

主题的创建和查看

我们可以通过 NewTopic 以代码的方式创建一个主题:

代码语言:javascript复制
    @Bean
    public NewTopic topic() {
        return new NewTopic("topic-test", 1, (short) 1);
    }

当然像 rabbitMQ 的api 那样,spring boot 还非常贴心的准备了 topic 建造者类:

代码语言:javascript复制
@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

还可以通过 AdminClient 创建主题:

代码语言:javascript复制
    @Autowired
    private   AdminClient adminClient;

    public String createTopic(){
        Collection<NewTopic> newTopics = new ArrayList<>(1);
        newTopics.add(new NewTopic("topic-a",1,(short) 1));
        adminClient.createTopics(newTopics);
        System.out.println("》》》》》》》》》》》》》》》 创建topic");
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        System.out.println(">>>>>>>>>>>>>>>>>>>获取列表");
        return "success";
    }

第一个参数是主题名称,第二个参数是分区数,第三个分区是副本数(包括leader). 我们可以通过 AdminClient 查看 主题信息:

代码语言:javascript复制
    public String getTopic() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        System.out.println(">>>>>>>>>>>>>>>>>>>获取列表");
        return "success";
    }

ListTopicsResult 的方法返回值都是 Future 类型的,这意味着它是异步的,使用的时候需要注意这一点.

和rabbitMQ 类似,kafka 给我们准备了一个默认主题:

代码语言:javascript复制
    @Scheduled(cron = "*/15 * * * * ?")
    public void sendDefault() {
        kafkaTemplate.sendDefault("xxx");
    }

这条消息会被发送到名为 topic.quick.default 的主题当中去.

kafkaTemplate.send 方法的使用

我们要注意 kafkaTemplate.send 它的返回值是ListenableFuture,从名字我们就能知道它实际上是一个异步的方法, 我们可以通过 ListenableFuture.addCallback 方法去指定回调函数:

代码语言:javascript复制
   @Scheduled(cron = "*/15 * * * * ?")
    public void send() {
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("xxxxx", "test");
        send.addCallback(new ListenableFutureCallback(){
            @Override
            public void onSuccess(Object o) {

            }
            @Override
            public void onFailure(Throwable throwable) {
                
            }
        });
    }

我们也可以通过 ListenableFuture.get 方法让它阻塞:

代码语言:javascript复制
    //    @Scheduled(cron = "*/15 * * * * ?")
    public void send1() {
        try {
            kafkaTemplate.send("xxxxx", "test").get(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

kafka 相关基本的api就介绍到这里了,源码可以上 https://github.com/muggle0/learn-simple 去找.

未完待续...

0 人点赞