Kafka集群搭建
本文通过实操Kafka的API来理解topic、partition等相关概念,我将通过搭建一个Kafka集群来实现它。
Kafka集群依赖于ZooKeeper对其Broker进行协调管理,所以我们也需要考虑搭建一个ZooKeeper集群。
主机规划
主机名称 | 角色 | IP 地址 | 基础软件 |
---|---|---|---|
node01 | kafka 集群节点 | 192.168.242.118 | JDK1.8 |
node02 | kafka 集群节点、ZooKeeper 集群节点 | 192.168.242.117 | JDK1.8 |
node03 | kafka 集群节点、ZooKeeper 集群节点 | 192.168.242.116 | JDK1.8 |
node04 | ZooKeeper 集群节点 | 192.168.242.115 | JDK1.8 |
其中:
- IP 地址与主机名之间的映射关系配置好。编辑
/etc/hosts
,添加如下内容:
192.168.242.118 node01
192.168.242.117 node02
192.168.242.116 node03
192.168.242.115 node04
- node01、node02、node03 为 Kafka 集群节点,node02、node03、node04 为 ZooKeeper 集群节点。
- ZooKeeper 集群已搭建完毕。
附 ZooKeeper 集群搭建过程: 在node02:
- tar -zxf apache-zookeeper-3.5.8-bin.tar.gz
- mv apache-zookeeper-3.5.8-bin /usr/local/zookeeper
- cd /usr/local/zookeeper/conf
- cp zoo_example.cfg zoo.cfg
- vi zoo.cfg
设置 dataDir=/var/zookeeper 末尾添加:server.1=node02:2888:3888 server.2=node03:2888:3888 server.3=node04:2888:3888
- mkdir -p /var/zookeeper
- echo 1 > /var/zookeeper/myid
- vi /etc/profile
export JAVA_HOME=/usr/local/java export ZK_HOME=/usr/local/zookeeper export PATH=
JAVA_HOME/bin:$ZK_HOME/bin
- source /etc/profile
- scp分发zk相关配置到node03、node04
scp -r /usr/local/zookeeper/ root@node03:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node03:/usr/local/zookeeper/conf/ scp /etc/profile root@node03:/etc scp -r /usr/local/zookeeper/ root@node04:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node04:/usr/local/zookeeper/conf/ scp /etc/profile root@node04:/etc
- 在node03
mkdir -p /var/zookeeper echo 2 > /var/zookeeper/myid source /etc/profile
- 在node04
mkdir -p /var/zookeeper echo 3 > /var/zookeeper/myid source /etc/profile
- 启动zk集群,在node02/node03/node04三个节点均执行
zkServer.sh start
ZK集群搭建完成后,可用zkServer.sh status
查看ZK集群状态:
node02,follower:
node03,leader:
node04,leader:
Kafka 集群
解压完Kafka安装文件后,修改配置文件config/server.properties
:
broker.id=0
listeners=PLAINTEXT://node01:9092
log.dirs=/var/kafka-logs
zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka
与ZK集群搭建一样,使用SCP分发,注意修改 broker.id
和 listeners
。
这里值得注意的是ZK连接配置项要带上/kafka。
凡是使用 ZooKeeper 的技术,一般按照项目部门之类的加一个节点路径,不要在 ZK 根节点创建自己的东西,防止难以维护。
配置Kafka环境变量,方便使用Kafka命令,编辑文件/etc/profile
:
export JAVA_HOME=/usr/local/java
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
启动 Kafka 集群
每台Kafka集群节点执行命令kafka-server-start.sh
# 前台启动
kafka-server-start.sh $KAFKA_HOME/config/server.properties
# 后台启动
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 查看topic
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
启动Kafka之后,再来看一下 ZK 节点:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 7] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 5] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
多了个 kafka 节点,这是可以想到为什么之前的配置文件zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka
这里最后要加个/kafka
了,就是 kafka 启动之后生成了很多内容,如果都放到 zk 根节点将很难维护。
Kafka 生成的一些内容:
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 8] ls /kafka/cluster
[id]
[zk: localhost:2181(CONNECTED) 9] get /kafka/cluster/id
{"version":"1","id":"7V2aCgVnQhuPdkdryBXt4w"}
[zk: localhost:2181(CONNECTED) 10] ls /kafka/con
config consumers controller controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
controller controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
[]
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
controller controller_epoch
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1608979966643"}
API
本文代码仓库:https://gitee.com/xblzer/kafka-api/tree/master/code/kafka-api
Topic的管理相关和Producer生产消息的API非常简单,这里不做特别说明了,代码中有注释,下面从Consumer相关的API开始展开说明。
Consumer
sub 订阅模式
订阅模式,必须设置消费者组,去掉消费者组
代码语言:javascript复制注释掉
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
执行报错
代码语言:javascript复制org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
也就是说,订阅模式可以用到消费者组的管理机制,在配置消费者的时候必须提供有效的group.id
。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消费者组,如果不设置消费者组会报错
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic名称是“topic”开头的topic
consumer.subscribe(Pattern.compile("^topic.*"));
现在我们只开启一个Consumer客户端,可以看到该消费者对产生的消息全部消费了:
单消费线程
再使用线程池,构造三个消费者线程,模拟不同的消费者客户端(属于同一消费组)。
也可以在kafka服务器开几个命令终端,命令如下 kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic topic01 --group group01 --property print.key=true --property print.value=true --property key.separator=,
这个命令的参数可以用 kafka-console-consumer.sh --help
查看。
线程池模拟多个消费者客户端:
代码语言:javascript复制/**
* 多个线程,不同的消费者(属于同一消费组)
*/
@Test
@SneakyThrows
public void testKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消费者组,如果不设置消费者组会报错
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
//开启三个线程,跑三个consumer客户端,他们属于同一消费组“group01”
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
16,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("[消费者‐%d]").build(),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 3; i ) {
threadPoolExecutor.execute(() -> {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic名称是“topic”开头的topic
consumer.subscribe(Pattern.compile("^topic.*"));
//订阅topic01
// consumer.subscribe(Arrays.asList("topic01"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
while (consumerRecordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = consumerRecordIterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
//消息所在分区
int partition = consumerRecord.partition();
//消息在所在分区的偏移量
long offset = consumerRecord.offset();
System.out.println("线程" Thread.currentThread().getName() "key:" key ",value:" value ",partition:" partition ",offset:" offset);
}
}
}
});
}
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
}
运行测试类,
Kafka Consumer协调器分配消费分区
可以看到Consumer协调器(ConsumerCoordinator
)分配消费分区情况:
线程名称 | 消费者 | 分区 |
---|---|---|
消费者-0 | consumer-1 | topic01的分区0,topic02的分区0 |
消费者-1 | consumer-2 | topic01的分区1,topic02的分区1 |
消费者-2 | consumer-3 | topic01的分区2 |
重新生产消息,查看消息消费情况:
10 条 record 被同一个消费组的三个消费者消费,这个是消费者组的特性之一,组内平分消费分区的消费进行消费,有一个负载均衡的理念在里面。
当消息中不带key(key=null)时,将按照轮询的方式对partition中的消息进行消费:
客户端宕机
再启动一个消费者客户端测试,
重新分配消费分区
重新分配消费分区
控制台有新的日志输出,可以看到ConsumerCoordinator
重新分配了消费分区:
线程名称 | 消费者 | 分区 |
---|---|---|
消费者-0 | consumer-1 | topic01的分区1,topic02的分区1 |
消费者-1 | consumer-2 | topic01的分区2 |
消费者-2 | consumer-3 | 没有分配消费分区 |
新开的线程 | 新开的消费者 | topic01的分区0,topic02的分区0 |
执行一下生产者,看下消费情况:
消费者消费分配给自己的分区内的消息!
这个时候把新开的那个Consumer断开,模拟消费者宕机,看Kafka的重新分配:
rebalancing:有一个Consumer宕机重新分配
Kafka消费者组内分区消费负载均衡。
消费者 assign 手动指定分区模式
上面演示的是consumer主动订阅,主动订阅的情况下,消费者协调器会协调消费者进行分区消费,有一个负载均衡的理念在里面。
手动指定分区进行消费的话,就会失去组的特性,assign 方法:
代码语言:javascript复制//从开始位置消费
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic01", 0));
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
Kafka的分区
在 探究Kafka高性能之道 一文中,我已提到了Kafka是如何决定发送消息到topic的哪个分区的:
kafka架构
Kafka默认的分区策略在DefaultPartitioner
中也有定义:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {
//...
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
//...
}
这里说明了:
- 如果发送消息指定了分区,那么消息全发送到指定的分区中
- 如果消息没有指定分区但是设置了key,那么按照消息的key进行hash然后和分区数进行取模,得到一个值x,Kafka就往分区x中发送消息
- 如果分区和key都没有指定,则默认采用轮询的方式。
上面已经使用API得到了验证。
一般情况下,这种默认的分区策略就满足生产需求了,但是如果有特殊的业务需求,还可以自定义分区策略,
代码语言:javascript复制public void testProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//定义Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
//TODO
}
先看一下,ProducerConfig
源码中关于分区配置的说明:
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";
自定义的Partitioner必须实现org.apache.kafka.clients.producer.Partitioner
接口,这里自定义一个Partitioner,分区策略也按照DefaultPartitioner
的策略来,只是其实现略有不同:
public class MyPartitioner implements Partitioner {
private AtomicInteger counter = new AtomicInteger(0);
/**
* 返回分区号
* @param topic topic
* @param key key
* @param keyBytes key的字节数
* @param value value
* @param valueBytes value的字节数
* @param cluster 集群信息
* @return 分区号
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//先获取集群的分区数
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int increment = counter.getAndIncrement();
//拿这个值模上分区数
// increment & Integer.MAX_VALUE 保证是个正数
return (increment & Integer.MAX_VALUE) % numPartitions;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
System.out.println("MyPartitioner#close.");
}
@Override
public void configure(Map<String, ?> map) {
System.out.println("MyPartitioner#configure.");
}
}
Producer这里:
代码语言:javascript复制//定义Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
运行生产这实例,这里生产topic02的消息:
可以看到分区策略走的是我们自定义的分区策略,消费者:
前面API创建topic02的时候只设置了两个分区,所以这里是两个分区的轮询。同理可以验证消息带key的分区消费策略。
序列化
前面API演示的时候,生产者和消费者有两个重要的配置,
ProducerConfig
- KEY_SERIALIZER_CLASS_CONFIG
- VALUE_SERIALIZER_CLASS_CONFIG
ConsumerConfig
- KEY_DESERIALIZER_CLASS_CONFIG
- VALUE_DESERIALIZER_CLASS_CONFIG
这个是生产者生产消息是需要对key和value进行序列化,消费者消费消息需要对其进行反序列化,前面序列化和反序列化类是StringSerializer
和StringDeserializer
,跟一下源码,可以看到他们都实现了规定好的接口(Serializer<String>
和Deserializer<String>
):
StringSerializer
StringDeserializer
生产环境中,我们发送的消息有时是对象,此时我们可以自定义对象序列化类,这样可以完成对象消息的传输,自定义序列化实现Serializer
和Deserializer
接口即可。
这里借助于commons-lang3
包下的SerializationUtils
来进行序列化和反序列化:
//序列化
@Override
public byte[] serialize(String topic, Object data) {
// return new byte[0];
return SerializationUtils.serialize((Serializable) data);
}
代码语言:javascript复制//反序列化
@Override
public Object deserialize(String topic, byte[] data) {
System.out.println("自定义反序列化 topic:" topic);
return SerializationUtils.deserialize(data);
}
生产消息,key是String类型,value是Order对象:
代码语言:javascript复制props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());
//创建生产者
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
消费消息:
代码语言:javascript复制props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
启动生产者:
自定义的序列化生效,再启动消费者,控制台打印:
成功将Order信息打印出来,自定义反序列化也生效了。
拦截器
发送数据的时候,可以通过拦截器拿到数据的一些消息,然后可以任意摆布这些数据了(对数据做一些装饰),比如发送失败了,我们可以通过拦截器把错误信息拿到进行分析。
只要在ProducerConfig
中配置INTERCEPTOR_CLASSES_CONFIG
这个配置项就可以设置拦截器了,和前面的Partitioner
、Serializer
同理,看一下这个配置项的源码描述:
/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
"Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
"received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
这里说明了默认是没有拦截器的,自定义拦截器需要实现ProducerInterceptor
接口。
public class MyProducerInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord record) {
return new ProducerRecord(record.topic(), record.key(), record.value() " --- 拦截了。");
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("metadata:" metadata ",exception:" exception);
}
@Override
public void close() {
System.out.println("MyProducerInterceptor#close");
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println("MyProducerInterceptor#configure");
}
}
运行生产者消费者即可观察到消息成功拦截。
小结
- Kafka集群需要ZooKeeper对其Broker进行协调管理,搭建Kafka集群前需要搭建ZK集群,搭建ZK集群需要注意配置每台节点的
myid
。 - Kafka集群的每个节点的配置文件中,需要注意的配置项(
KAFKA_HOME/config/server.properties文件
)broker.id
、listeners
、log.dirs
和zookeeper.connect
。 - Kafka基础API对topic进行管理,实现Producer生产消息,Consumer消费消息,并通过运行情况理解topic的分区,以及消费者组内消费消息的负载均衡。
- 利用Kafka相关API实现自定义的分区策略、自定义序列化、以及自定义Producer拦截器。