kafka之springboot集成kafka(四)

2024-01-18 23:22:36 浏览数 (1)

参考

https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

https://juejin.cn/post/7210225864355659835

https://thepracticaldeveloper.com/spring-boot-kafka-config/

https://reflectoring.io/spring-boot-kafka/

一、项目新建

1.1 方式一、spring项目自动生成

https://start.spring.io/

1.2 方式二、手动搭建引入kafka

1、pom引入

代码语言:js复制
     <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、yaml文件配置

代码语言:js复制
spring:
  kafka:
    producer:
    bootstrap-servers: 127.0.0.1:9092

二、代码编写

2.1 方式一、使用spring

2.1.1 创建主题(create Kafka Topic)

代码语言:java复制
@Slf4j
@RestController
public class TopicCreateController {

    @Autowired
    private KafkaProperties properties;

    @GetMapping("/create/{topicName}")
    public String createTopic(@PathVariable String topicName) {
        AdminClient client = AdminClient.create(properties.buildAdminProperties());
        if (client != null) {
            try {
                Collection<NewTopic> newTopics = new ArrayList<>(1);
                newTopics.add(new NewTopic(topicName, 1, (short) 1));
                client.createTopics(newTopics);
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                client.close();
            }
        }
        return topicName;
    }

    @GetMapping("/test")
    public String createTopic() {
        return "success";
    }
}

2.1.2 生产者(Spring Boot Kafka Producer)

Fire-and-forget模式

发送消息后不需要逻辑程序关心是否发送成功。

同步模式

即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。

代码语言:java复制
@Slf4j
@RestController
public class ProducerController {
    private static final String topic = "hello";
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    /**
     * 同步获取通知结果
     * @param msg
     * @return
     */
    @GetMapping("/produce/{msg}")
    public String produce(@PathVariable String msg) {
        // 发送消息
        try {
            SendResult result = kafkaTemplate.send(topic, msg).get();
            System.out.println(result);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        System.out.println("send"  msg);
        return "send: "  msg;
    }

异步生产者

代码语言:java复制
 @GetMapping("/produceAsync/{msg}")
    public String hello2(@PathVariable String msg) {
        // 同步获取结果
        ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send(topic,msg);
        try {
            SendResult<Object,Object> result = future.get();
            System.out.println("success >>> "  result.getRecordMetadata().topic()   ",offset" result.getRecordMetadata().offset()); // success >>> hello2);
        }catch (Throwable e){
            e.printStackTrace();
        }

        System.out.println("async send: "  msg);
        return "async send: "  msg;
    }

2.1.3 消费者(Spring Boot Kafka Consumer)

代码语言:java复制
// 输入代码内容
  @KafkaListener(id = "helloGroup", topics = "hello")
    public void hello(String msg) {
        System.out.println(msg);
    }

2.2 方式二 使用Kafka原生

2.2.1 生产者

代码语言:java复制
private static Properties props = new Properties();
static {
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public void ProduceMsg(String topic,String msg){
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 发送消息
        ProducerRecord<String,String> record =
                new ProducerRecord<String, String>(topic,msg);

        //producer.send(record);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("消息发送成功");
                } else {
                    System.out.println("消息发送失败");
                }
            }
        });
}

2.2.2 消费者

代码语言:java复制
  private static Properties props = new Properties();
    static {
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id","tpd-loggers");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }
    final KafkaConsumer<String, String> consumer;

    private volatile boolean isRunning = true;

    public AutoCommitConsumer(String topicName) {
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicName));
    }

    public void printReceiveMsg() {
        try {
            while (isRunning) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1000));
                Thread.sleep(1000);
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println("TopicName: "   consumerRecord.topic()   " Partition:"  
                                consumerRecord.partition()   " Offset:"   consumerRecord.offset()   ""  
                                " Msg:"   consumerRecord.value());
                    }

                }
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        finally {
            close();
        }

    }

    public void close() {
        isRunning = false;
        if (consumer != null) {
            consumer.close();
        }
    }

0 人点赞