kafka 保证分区数据顺序

2022-04-19 17:28:24 浏览数 (2)

Producer

kafka 发送消息
代码语言:javascript复制
KafkaTemplate
​
// 相同的key会发送到同一分区上
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
   ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
   return doSend(producerRecord);
}

// 指定分区
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
    return send(this.defaultTopic, partition, key, data);
}
​
kafka 发送消息-默认分区实现
代码语言:javascript复制
默认分片
org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
​
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

Consumer

kafka 监听获取消息
代码语言:javascript复制
@KafkaListener(topics = "topic1", groupId = "fooGroup")
public void listen(String kms) {
   logger.info("Received: "   kms);
   if (kms.startsWith("fail")) {
      throw new RuntimeException("failed");
   }
}
​
@KafkaListener(topics = "topic1.DLT", groupId = "fooGroup")
public void dltListen(String kms) {
   logger.info("Received from DLT: "   kms);
}

kafka 监听获取消息实现
代码语言:javascript复制
获取分片
org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#seekPartitions
​
private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
   this.consumerSeekAwareListener.registerSeekCallback(this);
   Map<TopicPartition, Long> current = new HashMap<>();
   // 获取分片数据
   for (TopicPartition topicPartition : partitions) {
      current.put(topicPartition, ListenerConsumer.this.consumer.position(topicPartition));
   }
   if (idle) {
      this.consumerSeekAwareListener.onIdleContainer(current, this.seekCallback);
   }
   else {
      this.consumerSeekAwareListener.onPartitionsAssigned(current, this.seekCallback);
   }
}

kafka 监听私信队列
私信队列配置
代码语言:javascript复制
@Configuration
public class KafkaErrorHandlerConfig {
    public KafkaErrorHandlerConfig() {
    }
​
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer, 
        ConsumerFactory<Object, Object> kafkaConsumerFactory, 
        KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        configurer.configure(factory, kafkaConsumerFactory);
        // dead-letter after 3 tries
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2L)));
        return factory;
    }
}

默认私信队列
代码语言:javascript复制
DeadLetterPublishingRecoverer
​
// 默认私信队列名称
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
   DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic()   ".DLT", cr.partition());

0 人点赞