1、单独KafkaConsumer实例and多worker线程。 将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,同时维护一个或者若各干consumer实例执行消息获取任务。 本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
代码语言:javascript复制 1 package com.bie.kafka.kafkaWorker;
2
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Collection;
6 import java.util.Collections;
7 import java.util.HashMap;
8 import java.util.Map;
9 import java.util.Properties;
10 import java.util.concurrent.ArrayBlockingQueue;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.ThreadPoolExecutor;
13 import java.util.concurrent.TimeUnit;
14
15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
16 import org.apache.kafka.clients.consumer.ConsumerRecords;
17 import org.apache.kafka.clients.consumer.KafkaConsumer;
18 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
19 import org.apache.kafka.common.TopicPartition;
20 import org.apache.kafka.common.errors.WakeupException;
21
22 /**
23 *
24 * @Description TODO
25 * @author biehl
26 * @Date 2019年6月1日 下午3:28:53
27 *
28 * @param <K>
29 * @param <V>
30 *
31 * 1、consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。 另外consumer位移提交也在该类中完成。
32 *
33 */
34 public class ConsumerThreadHandler<K, V> {
35
36 // KafkaConsumer实例
37 private final KafkaConsumer<K, V> consumer;
38 // ExecutorService实例
39 private ExecutorService executors;
40 // 位移信息offsets
41 private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
42
43 /**
44 *
45 * @param brokerList
46 * kafka列表
47 * @param groupId
48 * 消费组groupId
49 * @param topic
50 * 主题topic
51 */
52 public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
53 Properties props = new Properties();
54 // broker列表
55 props.put("bootstrap.servers", brokerList);
56 // 消费者组编号Id
57 props.put("group.id", groupId);
58 // 非自动提交位移信息
59 props.put("enable.auto.commit", "false");
60 // 从最早的位移处开始消费消息
61 props.put("auto.offset.reset", "earliest");
62 // key反序列化
63 props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
64 // value反序列化
65 props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
66 // 将配置信息装配到消费者实例里面
67 consumer = new KafkaConsumer<>(props);
68 // 消费者订阅消息,并实现重平衡rebalance
69 // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
70 // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
71 consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
72
73 /**
74 * 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
75 */
76 @Override
77 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
78 // 提交位移
79 consumer.commitSync(offsets);
80 }
81
82 /**
83 * rebalance完成后会调用onPartitionsAssigned方法。
84 */
85 @Override
86 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
87 // 清除位移信息
88 offsets.clear();
89 }
90 });
91 }
92
93 /**
94 * 消费主方法
95 *
96 * @param threadNumber
97 * 线程池中的线程数
98 */
99 public void consume(int threadNumber) {
100 executors = new ThreadPoolExecutor(
101 threadNumber,
102 threadNumber,
103 0L,
104 TimeUnit.MILLISECONDS,
105 new ArrayBlockingQueue<Runnable>(1000),
106 new ThreadPoolExecutor.CallerRunsPolicy());
107 try {
108 // 消费者一直处于等待状态,等待消息消费
109 while (true) {
110 // 从主题中获取消息
111 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L));
112 // 如果获取到的消息不为空
113 if (!records.isEmpty()) {
114 // 将消息信息、位移信息封装到ConsumerWorker中进行提交
115 executors.submit(new ConsumerWorker<>(records, offsets));
116 }
117 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
118 this.commitOffsets();
119 }
120 } catch (WakeupException e) {
121 // 此处忽略此异常的处理.WakeupException异常是从poll方法中抛出来的异常
122 //如果不忽略异常信息,此处会打印错误哦,亲
123 //e.printStackTrace();
124 } finally {
125 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
126 this.commitOffsets();
127 // 关闭consumer
128 consumer.close();
129 }
130 }
131
132 /**
133 * 尽量降低synchronized块对offsets锁定的时间
134 */
135 private void commitOffsets() {
136 // 尽量降低synchronized块对offsets锁定的时间
137 Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
138 // 保证线程安全、同步锁,锁住offsets
139 synchronized (offsets) {
140 // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
141 if (offsets.isEmpty()) {
142 return;
143 }
144 // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
145 unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
146 // 清除位移信息offsets
147 offsets.clear();
148 }
149 // 将封装好的位移信息unmodfiedMap集合进行同步提交
150 // 手动提交位移信息
151 consumer.commitSync(unmodfiedMap);
152 }
153
154 /**
155 * 关闭消费者
156 */
157 public void close() {
158 // 在另一个线程中调用consumer.wakeup();方法来触发consume的关闭。
159 // KafkaConsumer不是线程安全的,但是另外一个例外,用户可以安全的在另一个线程中调用consume.wakeup()。
160 // wakeup()方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用
161 consumer.wakeup();
162 // 关闭ExecutorService实例
163 executors.shutdown();
164 }
165
166 }
代码语言:javascript复制 1 package com.bie.kafka.kafkaWorker;
2
3 import java.util.List;
4 import java.util.Map;
5
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.clients.consumer.ConsumerRecords;
8 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
9 import org.apache.kafka.common.TopicPartition;
10
11 /**
12 *
13 * @Description TODO
14 * @author biehl
15 * @Date 2019年6月1日 下午3:45:38
16 *
17 * @param <K>
18 * @param <V>
19 *
20 * 1、本质上是一个Runnable,执行真正的消费逻辑并且上报位移信息给ConsumerThreadHandler。
21 *
22 */
23 public class ConsumerWorker<K, V> implements Runnable {
24
25 // 获取到的消息
26 private final ConsumerRecords<K, V> records;
27 // 位移信息
28 private final Map<TopicPartition, OffsetAndMetadata> offsets;
29
30 /**
31 * ConsumerWorker有参构造方法
32 *
33 * @param records
34 * 获取到的消息
35 * @param offsets
36 * 位移信息
37 */
38 public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
39 this.records = records;
40 this.offsets = offsets;
41 }
42
43 /**
44 *
45 */
46 @Override
47 public void run() {
48 // 获取到分区的信息
49 for (TopicPartition partition : records.partitions()) {
50 // 获取到分区的消息记录
51 List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition);
52 // 遍历获取到的消息记录
53 for (ConsumerRecord<K, V> record : partConsumerRecords) {
54 // 打印消息
55 System.out.println("topic: " record.topic() ",partition: " record.partition() ",offset: "
56 record.offset()
57 ",消息记录: " record.value());
58 }
59 // 上报位移信息。获取到最后的位移消息,由于位移消息从0开始,所以最后位移减一获取到位移位置
60 long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset();
61 // 同步锁,锁住offsets位移
62 synchronized (offsets) {
63 // 如果offsets位移不包含partition这个key信息
64 if (!offsets.containsKey(partition)) {
65 // 就将位移信息设置到map集合里面
66 offsets.put(partition, new OffsetAndMetadata(lastOffset 1));
67 } else {
68 // 否则,offsets位移包含partition这个key信息
69 // 获取到offsets的位置信息
70 long curr = offsets.get(partition).offset();
71 // 如果获取到的位置信息小于等于上一次位移信息大小
72 if (curr <= lastOffset 1) {
73 // 将这个partition的位置信息设置到map集合中。并保存到broker中。
74 offsets.put(partition, new OffsetAndMetadata(lastOffset 1));
75 }
76 }
77 }
78 }
79 }
80
81 }
代码语言:javascript复制 1 package com.bie.kafka.kafkaWorker;
2
3 /**
4 *
5 * @Description TODO
6 * @author biehl
7 * @Date 2019年6月1日 下午4:13:25
8 *
9 * 1、单独KafkaConsumer实例和多worker线程。
10 * 2、将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,
11 * 同时维护一个或者若各干consumer实例执行消息获取任务。
12 * 3、本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,
13 * 之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
14 *
15 *
16 */
17
18 public class ConsumerMain {
19
20 public static void main(String[] args) {
21 // broker列表
22 String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
23 // 主题信息topic
24 String topic = "topic1";
25 // 消费者组信息group
26 String groupId = "group2";
27 // 根据ConsumerThreadHandler构造方法构造出消费者
28 final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
29 final int cpuCount = Runtime.getRuntime().availableProcessors();
30 System.out.println("cpuCount : " cpuCount);
31 // 创建线程的匿名内部类
32 Runnable runnable = new Runnable() {
33
34 @Override
35 public void run() {
36 // 执行consume,在此线程中执行消费者消费消息。
37 handler.consume(cpuCount);
38 }
39 };
40 // 直接调用runnable此线程,并运行
41 new Thread(runnable).start();
42
43 try {
44 // 此线程休眠20000
45 Thread.sleep(20000L);
46 } catch (InterruptedException e) {
47 e.printStackTrace();
48 }
49 System.out.println("Starting to close the consumer...");
50 // 关闭消费者
51 handler.close();
52 }
53
54 }
待续......