参考
https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
https://juejin.cn/post/7210225864355659835
https://thepracticaldeveloper.com/spring-boot-kafka-config/
https://reflectoring.io/spring-boot-kafka/
一、项目新建
1.1 方式一、spring项目自动生成
https://start.spring.io/
1.2 方式二、手动搭建引入kafka
1、pom引入
代码语言:js复制 <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、yaml文件配置
代码语言:js复制spring:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
二、代码编写
2.1 方式一、使用spring
2.1.1 创建主题(create Kafka Topic)
代码语言:java复制@Slf4j
@RestController
public class TopicCreateController {
@Autowired
private KafkaProperties properties;
@GetMapping("/create/{topicName}")
public String createTopic(@PathVariable String topicName) {
AdminClient client = AdminClient.create(properties.buildAdminProperties());
if (client != null) {
try {
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic(topicName, 1, (short) 1));
client.createTopics(newTopics);
} catch (Throwable e) {
e.printStackTrace();
} finally {
client.close();
}
}
return topicName;
}
@GetMapping("/test")
public String createTopic() {
return "success";
}
}
2.1.2 生产者(Spring Boot Kafka Producer)
Fire-and-forget模式
发送消息后不需要逻辑程序关心是否发送成功。
同步模式
即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。
代码语言:java复制@Slf4j
@RestController
public class ProducerController {
private static final String topic = "hello";
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 同步获取通知结果
* @param msg
* @return
*/
@GetMapping("/produce/{msg}")
public String produce(@PathVariable String msg) {
// 发送消息
try {
SendResult result = kafkaTemplate.send(topic, msg).get();
System.out.println(result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("send" msg);
return "send: " msg;
}
异步生产者
代码语言:java复制 @GetMapping("/produceAsync/{msg}")
public String hello2(@PathVariable String msg) {
// 同步获取结果
ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send(topic,msg);
try {
SendResult<Object,Object> result = future.get();
System.out.println("success >>> " result.getRecordMetadata().topic() ",offset" result.getRecordMetadata().offset()); // success >>> hello2);
}catch (Throwable e){
e.printStackTrace();
}
System.out.println("async send: " msg);
return "async send: " msg;
}
2.1.3 消费者(Spring Boot Kafka Consumer)
代码语言:java复制// 输入代码内容
@KafkaListener(id = "helloGroup", topics = "hello")
public void hello(String msg) {
System.out.println(msg);
}
2.2 方式二 使用Kafka原生
2.2.1 生产者
代码语言:java复制private static Properties props = new Properties();
static {
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public void ProduceMsg(String topic,String msg){
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
ProducerRecord<String,String> record =
new ProducerRecord<String, String>(topic,msg);
//producer.send(record);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
});
}
2.2.2 消费者
代码语言:java复制 private static Properties props = new Properties();
static {
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id","tpd-loggers");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
final KafkaConsumer<String, String> consumer;
private volatile boolean isRunning = true;
public AutoCommitConsumer(String topicName) {
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
}
public void printReceiveMsg() {
try {
while (isRunning) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1000));
Thread.sleep(1000);
if (!consumerRecords.isEmpty()) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("TopicName: " consumerRecord.topic() " Partition:"
consumerRecord.partition() " Offset:" consumerRecord.offset() ""
" Msg:" consumerRecord.value());
}
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}
finally {
close();
}
}
public void close() {
isRunning = false;
if (consumer != null) {
consumer.close();
}
}