Producer
kafka 发送消息
代码语言:javascript
复制KafkaTemplate
// 相同的key会发送到同一分区上
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
// 指定分区
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
return send(this.defaultTopic, partition, key, data);
}
kafka 发送消息-默认分区实现
代码语言:javascript
复制默认分片
org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
Consumer
kafka 监听获取消息
代码语言:javascript
复制@KafkaListener(topics = "topic1", groupId = "fooGroup")
public void listen(String kms) {
logger.info("Received: " kms);
if (kms.startsWith("fail")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(topics = "topic1.DLT", groupId = "fooGroup")
public void dltListen(String kms) {
logger.info("Received from DLT: " kms);
}
kafka 监听获取消息实现
代码语言:javascript
复制获取分片
org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#seekPartitions
private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
this.consumerSeekAwareListener.registerSeekCallback(this);
Map<TopicPartition, Long> current = new HashMap<>();
// 获取分片数据
for (TopicPartition topicPartition : partitions) {
current.put(topicPartition, ListenerConsumer.this.consumer.position(topicPartition));
}
if (idle) {
this.consumerSeekAwareListener.onIdleContainer(current, this.seekCallback);
}
else {
this.consumerSeekAwareListener.onPartitionsAssigned(current, this.seekCallback);
}
}
kafka 监听私信队列
私信队列配置
代码语言:javascript
复制@Configuration
public class KafkaErrorHandlerConfig {
public KafkaErrorHandlerConfig() {
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, kafkaConsumerFactory);
// dead-letter after 3 tries
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2L)));
return factory;
}
}
默认私信队列
代码语言:javascript
复制DeadLetterPublishingRecoverer
// 默认私信队列名称
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() ".DLT", cr.partition());