单独KafkaConsumer实例and多worker线程。

2019-06-03 11:21:20 浏览数 (1)

1、单独KafkaConsumer实例and多worker线程。 将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,同时维护一个或者若各干consumer实例执行消息获取任务。 本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。

代码语言:javascript复制
  1 package com.bie.kafka.kafkaWorker;
  2 
  3 import java.time.Duration;
  4 import java.util.Arrays;
  5 import java.util.Collection;
  6 import java.util.Collections;
  7 import java.util.HashMap;
  8 import java.util.Map;
  9 import java.util.Properties;
 10 import java.util.concurrent.ArrayBlockingQueue;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.ThreadPoolExecutor;
 13 import java.util.concurrent.TimeUnit;
 14 
 15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 16 import org.apache.kafka.clients.consumer.ConsumerRecords;
 17 import org.apache.kafka.clients.consumer.KafkaConsumer;
 18 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 19 import org.apache.kafka.common.TopicPartition;
 20 import org.apache.kafka.common.errors.WakeupException;
 21 
 22 /**
 23  * 
 24  * @Description TODO
 25  * @author biehl
 26  * @Date 2019年6月1日 下午3:28:53
 27  * 
 28  * @param <K>
 29  * @param <V>
 30  * 
 31  *            1、consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。 另外consumer位移提交也在该类中完成。
 32  * 
 33  */
 34 public class ConsumerThreadHandler<K, V> {
 35 
 36     // KafkaConsumer实例
 37     private final KafkaConsumer<K, V> consumer;
 38     // ExecutorService实例
 39     private ExecutorService executors;
 40     // 位移信息offsets
 41     private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 42 
 43     /**
 44      * 
 45      * @param brokerList
 46      *            kafka列表
 47      * @param groupId
 48      *            消费组groupId
 49      * @param topic
 50      *            主题topic
 51      */
 52     public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
 53         Properties props = new Properties();
 54         // broker列表
 55         props.put("bootstrap.servers", brokerList);
 56         // 消费者组编号Id
 57         props.put("group.id", groupId);
 58         // 非自动提交位移信息
 59         props.put("enable.auto.commit", "false");
 60         // 从最早的位移处开始消费消息
 61         props.put("auto.offset.reset", "earliest");
 62         // key反序列化
 63         props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 64         // value反序列化
 65         props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 66         // 将配置信息装配到消费者实例里面
 67         consumer = new KafkaConsumer<>(props);
 68         // 消费者订阅消息,并实现重平衡rebalance
 69         // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
 70         // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
 71         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
 72 
 73             /**
 74              * 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
 75              */
 76             @Override
 77             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 78                 // 提交位移
 79                 consumer.commitSync(offsets);
 80             }
 81 
 82             /**
 83              * rebalance完成后会调用onPartitionsAssigned方法。
 84              */
 85             @Override
 86             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 87                 // 清除位移信息
 88                 offsets.clear();
 89             }
 90         });
 91     }
 92 
 93     /**
 94      * 消费主方法
 95      * 
 96      * @param threadNumber
 97      *            线程池中的线程数
 98      */
 99     public void consume(int threadNumber) {
100         executors = new ThreadPoolExecutor(
101                 threadNumber, 
102                 threadNumber, 
103                 0L, 
104                 TimeUnit.MILLISECONDS,
105                 new ArrayBlockingQueue<Runnable>(1000), 
106                 new ThreadPoolExecutor.CallerRunsPolicy());
107         try {
108             // 消费者一直处于等待状态,等待消息消费
109             while (true) {
110                 // 从主题中获取消息
111                 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L));
112                 // 如果获取到的消息不为空
113                 if (!records.isEmpty()) {
114                     // 将消息信息、位移信息封装到ConsumerWorker中进行提交
115                     executors.submit(new ConsumerWorker<>(records, offsets));
116                 }
117                 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
118                 this.commitOffsets();
119             }
120         } catch (WakeupException e) {
121             // 此处忽略此异常的处理.WakeupException异常是从poll方法中抛出来的异常
122             //如果不忽略异常信息,此处会打印错误哦,亲
123             //e.printStackTrace();
124         } finally {
125             // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
126             this.commitOffsets();
127             // 关闭consumer
128             consumer.close();
129         }
130     }
131 
132     /**
133      * 尽量降低synchronized块对offsets锁定的时间
134      */
135     private void commitOffsets() {
136         // 尽量降低synchronized块对offsets锁定的时间
137         Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
138         // 保证线程安全、同步锁,锁住offsets
139         synchronized (offsets) {
140             // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
141             if (offsets.isEmpty()) {
142                 return;
143             }
144             // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
145             unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
146             // 清除位移信息offsets
147             offsets.clear();
148         }
149         // 将封装好的位移信息unmodfiedMap集合进行同步提交
150         // 手动提交位移信息
151         consumer.commitSync(unmodfiedMap);
152     }
153 
154     /**
155      * 关闭消费者
156      */
157     public void close() {
158         // 在另一个线程中调用consumer.wakeup();方法来触发consume的关闭。
159         // KafkaConsumer不是线程安全的,但是另外一个例外,用户可以安全的在另一个线程中调用consume.wakeup()。
160         // wakeup()方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用
161         consumer.wakeup();
162         // 关闭ExecutorService实例
163         executors.shutdown();
164     }
165 
166 }
代码语言:javascript复制
 1 package com.bie.kafka.kafkaWorker;
 2 
 3 import java.util.List;
 4 import java.util.Map;
 5 
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.apache.kafka.clients.consumer.ConsumerRecords;
 8 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 9 import org.apache.kafka.common.TopicPartition;
10 
11 /**
12  * 
13  * @Description TODO
14  * @author biehl
15  * @Date 2019年6月1日 下午3:45:38
16  * 
17  * @param <K>
18  * @param <V>
19  * 
20  *            1、本质上是一个Runnable,执行真正的消费逻辑并且上报位移信息给ConsumerThreadHandler。
21  * 
22  */
23 public class ConsumerWorker<K, V> implements Runnable {
24 
25     // 获取到的消息
26     private final ConsumerRecords<K, V> records;
27     // 位移信息
28     private final Map<TopicPartition, OffsetAndMetadata> offsets;
29 
30     /**
31      * ConsumerWorker有参构造方法
32      * 
33      * @param records
34      *            获取到的消息
35      * @param offsets
36      *            位移信息
37      */
38     public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
39         this.records = records;
40         this.offsets = offsets;
41     }
42 
43     /**
44      * 
45      */
46     @Override
47     public void run() {
48         // 获取到分区的信息
49         for (TopicPartition partition : records.partitions()) {
50             // 获取到分区的消息记录
51             List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition);
52             // 遍历获取到的消息记录
53             for (ConsumerRecord<K, V> record : partConsumerRecords) {
54                 // 打印消息
55                 System.out.println("topic: "   record.topic()   ",partition: "   record.partition()   ",offset: "
56                           record.offset() 
57                           ",消息记录: "   record.value());
58             }
59             // 上报位移信息。获取到最后的位移消息,由于位移消息从0开始,所以最后位移减一获取到位移位置
60             long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset();
61             // 同步锁,锁住offsets位移
62             synchronized (offsets) {
63                 // 如果offsets位移不包含partition这个key信息
64                 if (!offsets.containsKey(partition)) {
65                     // 就将位移信息设置到map集合里面
66                     offsets.put(partition, new OffsetAndMetadata(lastOffset   1));
67                 } else {
68                     // 否则,offsets位移包含partition这个key信息
69                     // 获取到offsets的位置信息
70                     long curr = offsets.get(partition).offset();
71                     // 如果获取到的位置信息小于等于上一次位移信息大小
72                     if (curr <= lastOffset   1) {
73                         // 将这个partition的位置信息设置到map集合中。并保存到broker中。
74                         offsets.put(partition, new OffsetAndMetadata(lastOffset   1));
75                     }
76                 }
77             }
78         }
79     }
80 
81 }
代码语言:javascript复制
 1 package com.bie.kafka.kafkaWorker;
 2 
 3 /**
 4  * 
 5  * @Description TODO
 6  * @author biehl
 7  * @Date 2019年6月1日 下午4:13:25
 8  *
 9  *       1、单独KafkaConsumer实例和多worker线程。
10  *       2、将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,
11  *       同时维护一个或者若各干consumer实例执行消息获取任务。
12  *       3、本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,
13  *       之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
14  * 
15  * 
16  */
17 
18 public class ConsumerMain {
19 
20     public static void main(String[] args) {
21         // broker列表
22         String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
23         // 主题信息topic
24         String topic = "topic1";
25         // 消费者组信息group
26         String groupId = "group2";
27         // 根据ConsumerThreadHandler构造方法构造出消费者
28         final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
29         final int cpuCount = Runtime.getRuntime().availableProcessors();
30         System.out.println("cpuCount : "   cpuCount);
31         // 创建线程的匿名内部类
32         Runnable runnable = new Runnable() {
33 
34             @Override
35             public void run() {
36                 // 执行consume,在此线程中执行消费者消费消息。
37                 handler.consume(cpuCount);
38             }
39         };
40         // 直接调用runnable此线程,并运行
41         new Thread(runnable).start();
42 
43         try {
44             // 此线程休眠20000
45             Thread.sleep(20000L);
46         } catch (InterruptedException e) {
47             e.printStackTrace();
48         }
49         System.out.println("Starting to close the consumer...");
50         // 关闭消费者
51         handler.close();
52     }
53 
54 }

待续......

0 人点赞