【spring-kafka】属性concurrency的作用及如何配置(RoundRobinAssignor 、RangeAssignor)

2021-07-14 10:31:52 浏览数 (1)

目录

    • concurrency属性作用
    • 什么情况下设置concurrency,以及设置多少
    • RoundRobinAssignor 和 RangeAssignor 作用
    • 不同配置的实验分析
      • 分区数3|concurrency = 1|启动一个客户端(单机)
      • 分区数3|concurrency = 1|启动2个客户端(分布式模式)
      • 分区数3|concurrency = 3|启动一个客户端
      • 分区数3|concurrency = 3|启动2个客户端(分布式模式)
    • 批量消费

concurrency属性作用

concurrency默认是1;

container.setConcurrency(3)表示创建三个KafkaMessageListenerContainer实例。 一个KafkaMessageListenerContainer实例分配一个分区进行消费; 如果设置为1的情况下, 这一个实例消费Topic的所有分区; 如果设置多个,那么会平均分配所有分区; 如果实例>分区数; 那么空出来的实例会浪费掉; 如果实例<=分区数 那么会有一部分实例消费多个实例,但也是均衡分配的 如果在分布式情况下, 那么总的KafkaMessageListenerContainer实例数= 服务器机器数量*concurrency ;

什么情况下设置concurrency,以及设置多少

这个得看我们给Topic设置的分区数量; 总的来说就是 机器数量*concurrency <= 分区数

例如分区=3; 而且同时有3台机器 ,那么concurrency=1就行了; 设置多了就会浪费资源;、

例如分区=9; 只有3台机器;那么可以concurrency=3 ; 每台机器3个消费者连接3个分区; 那么你可能会问我们concurrency=1不也可以吗; 反正都是一台机器消费3个分区; 话是没有错; 但是他们的差别在 一个线程消费3个分区和 3个线程消费3个分区 , 单线程和多线程你选哪个

RoundRobinAssignor 和 RangeAssignor 作用

默认情况下 spring.kafka.consumer.properties.partition.assignment.strategy= org.apache.kafka.clients.consumer.RangeAssignor

假如如下情况,同时监听了2个Topic; 并且每个topic的分区都是3; concurrency设置为6;

代码语言:javascript复制
    @KafkaListener(id = "consumer-id6", topics = {"SHI_TOPIC3","SHI_TOPIC4"}, containerFactory = "concurrencyFactory"
            , clientIdPrefix = "myClientId6")
    public void consumer6(List<?> list) {
        StringBuffer sb = new StringBuffer();

        list.forEach((l)->{
            sb.append("|msg:").append(l);
        });
        log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(),sb);

    }

那么你期望的是不是 2*3=6 刚好6个线程;一个线程分配一个分区; 那么我们运行看看结果

看上图中,我们发现并没有按照我们的预期去做; 有三个消费者其实是闲置状态的; 只有另外的3个消费者负责了2个Topic的总共6个分区; 因为默认的分配策略是 spring.kafka.consumer.properties.partition.assignment.strategy= org.apache.kafka.clients.consumer.RangeAssignor ;

如果想达到我们的预期;那你可以修改策略; spring.kafka.consumer.properties.partition.assignment.strategy= org.apache.kafka.clients.consumer.RoundRobinAssignor

修改之后

每个线程分配一个分区

不同配置的实验分析

分区数3|concurrency = 1|启动一个客户端(单机)

创建了名为 SHI_TOPIC3并且分区数为3的Topic

代码启动,设置concurrency = 1, 只启动一个客户端;

启动日志

代码语言:javascript复制
2020-11-18 17:14:42 o.a.k.c.c.i.ConsumerCoordinator 611 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] 
Finished assignment for group at generation 6: {myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51=
Assignment(partitions=[SHI_TOPIC3-0, SHI_TOPIC3-1, SHI_TOPIC3-2])}

 2020-11-18 17:14:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: 
 partitions assigned: [SHI_TOPIC3-2, SHI_TOPIC3-1, SHI_TOPIC3-0]

可以看到这个客户端myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51 被分配了3个分区SHI_TOPIC3-0, SHI_TOPIC3-1, SHI_TOPIC3-2;

消费日志

代码语言:javascript复制
2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1605690882681, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882615, value = 我是data0),value:我是data0,partition:2,offset:0
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1605690882705, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data4),value:我是data4,partition:2,offset:1
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1605690882705, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data5),value:我是data5,partition:2,offset:2
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1605690882706, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882705, value = 我是data6),value:我是data6,partition:2,offset:3
 2020-11-18 17:14:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1605690882706, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605690882706, value = 我是data7),value:我是data7,partition:2,offset:4
.....

可以看到线程都是同一个 Thread[consumer-id5-0-C-1,5,main] ; 说明的问题就是 在消费的时候是单线程消费的,并且还是一个线程去消费 3个分区的数据; 又涉及到切换消费分区的问题;

查询这个消费组的消费情况;

也证实只有一个消费者myClientId5-0-a273480d-2370-49e5-9187-ed10fe6dcf51在消费3个分区的数据;

分区数3|concurrency = 1|启动2个客户端(分布式模式)

第一个客户端不动,继续运行, 然后启动第二个客户端 第一个客户端发生的变化

代码语言:javascript复制
 2020-11-18 17:34:24 o.a.k.c.c.i.ConsumerCoordinator 611 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Finished assignment for group at generation 9: {myClientId5-0-66a81e88-d924-4890-8b8e-2c6960ed0704=Assignment(partitions=[SHI_TOPIC3-2]), myClientId5-0-31c9a99f-5735-4a1d-b537-95bc5ab4533f=Assignment(partitions=[SHI_TOPIC3-0, SHI_TOPIC3-1])}

第一个客户端进行了 再平衡 ; 因为多了第二个可以分担压力进行消费; 可以看到把SHI_TOPIC3-2平衡出去了

第二个客户端的日志

代码语言:javascript复制
 2020-11-18 17:34:24 o.a.k.c.Metadata 277 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Cluster ID: O304VSOeSEyporzbs5AITA
 2020-11-18 17:34:24 o.a.k.c.c.i.AbstractCoordinator 797 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Discovered group coordinator xxxxxx:9092 (id: 2147483645 rack: null)
 2020-11-18 17:34:24 o.a.k.c.c.i.AbstractCoordinator 552 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] (Re-)joining group
 
 2020-11-18 17:34:25 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2]

查询客户端消费情况

可以看到第二个客户端分配到了SHI_TOPIC3--2的分区进行消费; 并且是单线程消费;

分区数3|concurrency = 3|启动一个客户端

客户端日志

代码语言:javascript复制
2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-1, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-1
 2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-0, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-0
 2020-11-18 17:50:42 o.a.k.c.c.i.ConsumerCoordinator 273 [INFO] [Consumer clientId=myClientId5-2, groupId=consumer-id5] Adding newly assigned partitions: SHI_TOPIC3-2
 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-2]
 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-0]
 2020-11-18 17:50:42 o.s.k.l.KafkaMessageListenerContainer 292 [INFO] consumer-id5: partitions assigned: [SHI_TOPIC3-1]

上面日志显示 创建了3个消费者,他们都属于同一个消费组groupId=consumer-id5,3个分区刚好3个消费者一人一个分区平均分配;

客户端日志

代码语言:javascript复制
 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-0-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1605693042720, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042432, value = 我是data0),value:我是data0,partition:0,offset:11

 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-2-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 2, leaderEpoch = 0, offset = 12, CreateTime = 1605693042751, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042750, value = 我是data1),value:我是data1,partition:2,offset:12

 2020-11-18 17:50:45 c.d.b.k.KafkaListeners 109 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main] consumer-id5 消费->record:ConsumerRecord(topic = SHI_TOPIC3, partition = 1, leaderEpoch = 0, offset = 17, CreateTime = 1605693042757, serialized key size = 13, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1605693042757, value = 我是data7),value:我是data7,partition:1,offset:17

每个消费者都是单线程,一个线程消费一个分区

分区数3|concurrency = 3|启动2个客户端(分布式模式)

启动第一个客户端

启动第二个客户端

启动第二个客户端之后就发生了 再分配rebalance; 可以看到,总共就有6个消费者, 但是其中的3个都是处于空闲状态; 因为一个分区最多只能有一个分区来进行消费;

批量消费

代码语言:javascript复制
  /**
     * 监听器工厂 批量消费
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(1);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

配置文件设置 批量的最大条数

代码语言:javascript复制
kafka.consumer.max-poll-records = 20

消费

代码语言:javascript复制
    @KafkaListener(id = "consumer-id6", topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory"
            , clientIdPrefix = "myClientId6")
    public void consumer6(List<?> list) {
        StringBuffer sb = new StringBuffer();
        list.forEach((l)->{
            sb.append("|msg:").append(l);
        });
        log.info("线程:{} consumer-id6 消费->{}",Thread.currentThread(),sb);

    }

0 人点赞