一、消息的发送和接收
⽣产者主要的对象有:KafkaProducer
,ProducerRecord
。
其中KafkaProducer
是⽤于发送消息的类,ProducerRecord
类⽤于封装 Kafka 的消息。
KafkaProducer
的创建需要指定的参数和含义:
参数 | 说明 |
---|---|
bootstrap.servers | 配置⽣产者如何与broker建⽴连接。该参数设置的是初始化参数。如果⽣产者需要连接的是Kafka集群,则这⾥配置集群中⼏个broker的地址,⽽不是全部,当⽣产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。 |
key.serializer | 要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。 |
value.serializer | 要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使⽤该类的Class对象。 |
acks | 默认值:all。acks=0:⽣产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。该情形不能保证broker是否真的收到了消息,retries配置也不会⽣效。发送的消息的返回的消息偏移量永远是-1。acks=1表示消息只需要写到主分区即可,然后就响应客户端,⽽不等待副本分区的确认。在该情形下,如果主分区收到消息确认之后就宕机了,⽽副本分区还没来得及同步该消息,则该消息丢失。acks=all⾸领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有⼀个ISR副本分区存活,消息就不会丢失。这是Kafka最强的可靠性保证,等效于acks=-1 |
retries | retries重试次数当消息发送出现错误的时候,系统会重发消息。跟客户端收到错误时重发⼀样。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了 |
其他参数可以从org.apache.kafka.clients.producer.ProducerConfig
中找到。我们后⾯的内容会介绍到。
消费者⽣产消息后,需要broker端的确认,可以同步确认,也可以异步确认。
同步确认效率低,异步确认效率⾼,但是需要设置回调对象。
添加Maven依赖:
代码语言:javascript复制
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!--高版本兼容低版本-->
<version>1.0.2</version>
</dependency>
生产者
这里我使用本地虚拟机,我本地虚拟机的IP是192.168.0.102
同步等待消息确认:
代码语言:javascript复制
public class MyProducer1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接⽤到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put("bootstrap.servers", "192.168.0.102:9092");
// 设置key的序列化器
configs.put("key.serializer", IntegerSerializer.class);
// 设置value的序列化器
configs.put("value.serializer", StringSerializer.class);
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
// ⽤于封装Producer的消息
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1", // 主题名称
0, // 分区编号,现在只有⼀个分区,所以是0
0, // 数字作为key
"message 0" // 字符串作为value
);
// 发送消息,同步等待消息的确认
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(3000, TimeUnit.MILLISECONDS);
System.out.println("主题:" metadata.topic()
"n分区:" metadata.partition()
"n偏移量:" metadata.offset()
"n序列化的key字节:" metadata.serializedKeySize()
"n序列化的value字节:" metadata.serializedValueSize()
"n时间戳:" metadata.timestamp());
// 关闭⽣产者
producer.close();
}
}
异步等待消息确认:
代码语言:javascript复制
public class MyProducer2 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "192.168.0.102:9092");
configs.put("key.serializer", IntegerSerializer.class);
configs.put("value.serializer", StringSerializer.class);
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1", 0, 1, "message 2");
// 使⽤回调异步等待消息的确认
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("主题:" metadata.topic()
"n分区:" metadata.partition()
"n偏移量:" metadata.offset()
"n序列化的key字节:" metadata.serializedKeySize()
"n序列化的value字节:" metadata.serializedValueSize()
"n时间戳:" metadata.timestamp());
} else {
System.out.println("有异常:" exception.getMessage());
}
}
});
// 关闭连接
producer.close();
}
}
消费者:
代码语言:javascript复制
public class MyConsumer1 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
configs.put("bootstrap.servers", "192.168.0.102:9092");
// key的反序列化器
configs.put("key.deserializer", IntegerDeserializer.class);
// value的反序列化器
configs.put("value.deserializer", StringDeserializer.class);
// 设置消费组
configs.put("group.id", "consumer.demo");
// 创建消费者对象
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
// 可以使用正则表达式批量订阅主题
// final Pattern pattern = Pattern.compile("topic_\d")
final Pattern pattern = Pattern.compile("topic_[0-9]");
final List<String> topics = Arrays.asList("topic_1");
// 消费者订阅主题或分区
// consumer.subscribe(pattern);
// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println("剥夺的分区:" tp.partition());
});
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println(tp.partition());
});
}
});
// 拉取订阅主题的消息
final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
// 获取topic_1主题的消息
final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
// 遍历topic_1主题的消息
topic1Iterable.forEach(record -> {
System.out.println("========================================");
System.out.println("消息头字段:" Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" record.key());
System.out.println("消息的偏移量:" record.offset());
System.out.println("消息的分区号:" record.partition());
System.out.println("消息的序列化key字节数:" record.serializedKeySize());
System.out.println("消息的序列化value字节数:" record.serializedValueSize());
System.out.println("消息的时间戳:" record.timestamp());
System.out.println("消息的时间戳类型:" record.timestampType());
System.out.println("消息的主题:" record.topic());
System.out.println("消息的值:" record.value());
});
// 关闭消费者
consumer.close();
}
}
二、Spring Boot Kafka
pom.xml 依赖
代码语言:javascript复制
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
application.properties
代码语言:javascript复制
spring.application.name=demo-02-producer-consumer
server.port=8080
# ⽤于建⽴初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.0.102:9092
# producer⽤到的key和value的序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer⽤到的key和value的反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-02-consumer
# 是否⾃动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交⼀次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则⾃动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest
Application.java 启动类
代码语言:javascript复制
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
KafkaConfig.java 配置类,可以在应用启动时创建Topic,这里可以不用
代码语言:javascript复制
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return new NewTopic("ntp-01", 5, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("ntp-02", 3, (short) 1);
}
}
生产者 KafkaSyncProducerController.java
代码语言:javascript复制
@RestController
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;
// 同步等待消息发送
@GetMapping("/sendSync/{message}")
public String sendSync(@PathVariable String message) throws ExecutionException, InterruptedException {
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"spring-topic-01", 0, 1, message
);
ListenableFuture future = template.send(record);
// 同步等待broker的响应
Object o = future.get();
SendResult<Integer, String> result = (SendResult<Integer, String>) o;
System.out.println(result.getRecordMetadata().topic()
result.getRecordMetadata().partition()
result.getRecordMetadata().offset());
return "success";
}
// 异步等待消息确认
@GetMapping("/sendAsync/{message}")
public String sendAsync(@PathVariable String message) throws ExecutionException, InterruptedException {
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"spring-topic-01", 0, 1, message
);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
// 异步等待broker的响应
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送失败: " throwable.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("发送成功:" result.getRecordMetadata().topic() "t"
result.getRecordMetadata().partition() "t"
result.getRecordMetadata().offset());
}
});
return "success";
}
}
消费者MyConsumer.java
代码语言:javascript复制
@Component
public class MyConsumer {
@KafkaListener(topics = "spring-topic-01")
public void onMessage(ConsumerRecord<Integer, String> record) {
Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
if (optional.isPresent()) {
System.out.println(record.topic() "t"
record.partition() "t"
record.offset() "t"
record.key() "t"
record.value());
}
}
}