Kafka
- 一、概述
- 二、集群部署
- 三、工作流程分析
- 1. 生产过程
- (1)写入方式(push)
- (2)Partition
- (3)Replication(副本)
- (4)写入流程
- 2. 存储过程
- 1. 生产过程
- 四、API使用
- 1. Producer
- 2. Consumer
一、概述
- 消息队列 Kafka采用点对点模式,必须有监控队列轮询的进程在(耗资源),可以随时任意速度获取数据。 发布订阅模式:速度由消息队列推送决定,不用进程监控。
优点: (1)解耦 (2)冗余(备份) (3)扩展性 (4)灵活性、峰值处理能力 (5)可恢复性(冗余) (6)顺序保证(队列) (7)缓冲(冗余) (8)异步通信(宕机)
- Kafka 分布式消息队列,由LinkidIn公司开发,底层是Scala,先由Apache维护,kafka_
{kafka-version}.tgz。 Kafka对消息保存时根据Topic(保存)进行分类,发送消息者称为Producer(入口),消息接收者称为Consumer(出口),此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为Broker。一个分区(Partition)维护一个偏移量 (offset)。 Kafka集群和Consumer都依赖zookeeper集群保存一些meta信息。
- 架构
Leader为主,Follower为备,Kafka中的Follower不处理任何请求。 消费者组的不同消费者不能同时消费同一个分区的数据。
二、集群部署
代码语言:javascript复制# 在解压的kafka目录下(推荐),创建一个存放日志(也存放数据)的信息
mkdir logs
# 修改server.properties
# 每一个实例的唯一辨识(int)
broker.id=0
# 是否可以删除topic
delete.topic.enable=true
# 设置日志打印的位置为创建的日志目录
log.dirs=/opt/kafka/logs
# 缓存数据的时间为7天、大小为1G
log.retention.hours=168
log.segment.bytes=1073741824
# 修改zk集群
zookeeper.connect=${id}:${port},${id}:${port},${id}:${port}
# 集群分发,修改broker.id
xsync kafka/
- 命令
# bin/下的命令
kafka-server-start.sh
kafka-server-stop.sh
# 对topic的操作
kafka-topics.sh
# 测试时,控制台的消费
kafka-console-consumer.sh
kafka-console-producer.sh
代码语言:javascript复制--topic # 定义topic名
--replication-factor # 定义副本数
--partitions # 定义分区数
--daemon # 后台启动
代码语言:javascript复制# kafka依赖zk,先启动zk,并确定zk状态为leader/foller
zkstart.sh
zk/bin/zkServer.sh status
# 使用kafka自带的zk,-daemon表示后台启动
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 指定配置文件启动kafka,阻塞进程
bin/kafka-server-start.sh config/server.properties
# 查看进程
jps -l
# 创建分区数为x(kafka/logs下的一个目录),副本数为x(不能超过集群的节点数),名称为x的topic
bin/kafka-topics.sh --create --zookeeper ${id}:${port} --partitions ${number} --replication-factor ${number} --topic ${name}
# 查看topic数量,显示为topic name
bin/kafka-topics.sh --list --zookeeper ${id}:${port}
# 删除topic,能创建同名的即删除成功
bin/kafka-topics.sh --zookeeper ${id}:${port} --delete --topic ${name}
# 发送消息,连接kafka集群(kafka默认9092),写进topic
bin/kafka-console-producer.sh --broker-list ${id}:${port} --topic ${name}
# 消费消息,连接zk集群,读取topic,不加--from-beginning表示只获取最新的
bin/kafka-console-consumer.sh --zookeeper ${id}:${port} --from-beginning --topic ${name}
# 新版本的kafka中consumer的offset存储在本地,提升效率,不交由zk保存,会报警告!bootstrap(附属于)本地kafka集群,名为__consumer_offset的topic中。
bin/kafka-console-consumer.sh --bootstrap-server ${id}:${port} --from-beginning --topic ${name}
# 查看topic的详情,Isr为选举(其中某个与宕机的Leader节点数据最相近,作为新的Leader),ReplicationFactor为副本,值为broker.id
bin/kafka-topics.sh --zookeeper ${id}:${port} --describe-topic ${name}
三、工作流程分析
三大流程:生产、存储、消费
Kafka Cluster:Broker1、Broker2、Broker3 Producer创建Topic,指定分区数和副本数
代码语言:javascript复制# 分区的好处:不同分区放在不同节点,实现了负载;消费者组只能消费不同分区数据,提高了并发度(topic的分区数应与一个消费者组中的消费者个数相同)
Broker1:Topic A-Partition 0-Leader、Topic A-Partition 1-Follower
Replication A/0 Replication A/1
Broker2:Topic A-Partition 0-Follower、Topic A-Partition 1-Leader
Consumer Group:Consumer A、Consumer B
1. 生产过程
(1)写入方式(push)
Producer采用push模式将信息发布到Broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐量)。
(2)Partition
消息都被发送到一个topic,本质就是一个目录,而topic又由一些partition logs(分区日志,offset都从0开始,有序唯一,并不断追加)组成。
代码语言:javascript复制分区的原因:
(1)方便在集群中扩展,每个partition可以通过调整以适应它所在的及其,而每个topic又可以由多个partition组成,因此整个集群就可以适应任意大小的数据;
(2)可以提高并发,同一个消费者组不能读取同一个分区的数据,因此可以以partition为单位读写。
分区的原则:
(1)指定了partition,则直接使用;
(2)未指定partition但指定key,通过对key的value进行hash出一个partition;
(3)partition和key都未指定,使用轮询选出一个partition。
(3)Replication(副本)
同一个partition可能有多个replication(server.properties配置中default.replication.factor=N),没有replication,一旦broker宕机,其上所有的数据都不可被消费,同时producer也不能将数据存于其上的partition。 引入replication,在需要时在其中选举出一个leader,producer和consumer只与这个leader交互,其他的作为follower从leader中复制数据。
(4)写入流程
ACK机制:0/1/all,1表示leader,all表示leader和follower均写入信息再继续接收。图为ACK为all的机制,防止数据丢失。
2. 存储过程
(1)存储方式 物理上把topic分为一个或多个partition,每个partition物理上对应一个文件夹,存储该partition的所有消息和索引文件。 实际存储数据的文件为logs/xxxx.log文件,存在序列化。
(2)存储策略 无论消息是否被消费,kafka都会保留所有的消息,删除方式有两种: 基于时间(log.retention.hours=168)和大小(log.retention.bytes=1073741824)。
代码语言:javascript复制kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,删除过期文件与提高kafka性能无关。
(3)zk存储机制
Consumer存储的是偏移量(低版本kafka),Producer不在zk注册,Brokers也存储在zk。
## 3. 消费过程 Kafka提供了两套consumer API:高级Comsumer API和低级Consumer API。
(1)高级API 不能管理offset,书写简单,系统通过zk自行管理; 不能管理分区、副本等,系统自动管理(默认1分钟更新zk中保存的offset )。 可以使用group来区分对同一个topic的不同程序访问分离开俩。
(2)低级API 能够开发者控制offset,随机读取; 书写复杂,需要自行控制offset,连接分区,找到leader等。
(3)消费者组
consumer.properties中group.id=group0
,设置消费者组名。启动消费者时,需要添加命令--consumer.config config/consumer.properties
。
四、API使用
1. Producer
Maven依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
ProducerConfig类下包含所有的配置参数,以及doc参考文档。
代码语言:javascript复制1.高级API:带或不带回调函数的生产者
public class Producer {
public static void main(String[] args) {
// 设置配置文件
Properties props = new Properties();
// Kafka集群
props.put("bootstrap.servers", "localhost:9092");
// 应答级别,all可以写出-1
props.put("acks", "all");
// 重试次数
props.put("retries", 0);
// 批量大小和提交延迟,决定发送时刻
props.put("batch.size", 16384);
props.put("linger.ms", 1);
// 缓存
props.put("buffer.memory", 33554432);
// KV序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送数据,第二个参数可以加上回调函数,重写onCompletion(RecordMetadata, exception)函数
producer.send(new ProducerRecord<>("first", String.valueOf(123)), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition() "-" metadata.offset());
} else {
System.out.println("ERROR:" exception);
}
});
producer.close();
}
}
// 2.低级API:自定义分区的生产者
// 实现Partition类,重写方法partition、close、configure,配置文件需要匹配生产者
props.put("partitioner.class", "${全类名}");
注意:分区中所有偏移数据消费掉,再消费下一个分区,可能会出现消费数据的顺序和生产的顺序不同。
2. Consumer
代码语言:javascript复制// 1.高级API
public class Consumer {
public static void main(String[] args) {
// 配置信息
Properties props = new Properties();
// kafka集群
props.put("bootstrap.servers", "localhost:9092");
// 消费者组id
props.put("group.id", "test");
// 设置自动提交offset
props.put("enable.auto.commit", "true");
// 提交延迟
props.put("auto.commit.interval.ms", "1000");
// KV的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 指定Topic
consumer.subscribe(Arrays.asList("first", "second"));
// 获取数据结束,JVM自动退出
while (true) {
// 获取数据,参数为获取延迟
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
// 打印数据
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println(record.topic() "-" record.partition() "-" record.value());
}
}
}
}
// 2.低级API:读取指定topic、partition(找leader)、offset的数据
/**
* 主要步骤:
* (1*)findLeader(),根据指定的分区从主题元数据中找到主副本;
* (2)getLastOffset(),获取分区最新的消费进度;
* (3*)run(),从主副本拉取分区的消息;
* (4)findNewLeader(),识别主副本的变化,重试。
**/
// 找分区leader(元数据信息)
private BrokerEndPoint findLeader(List<String> brokers, int port, String topic, int partition) {
for (String broker : brokers) {
// 创建获取分区Leader的消费者对象,链接到具体某一个节点
SimpleConsumer getLeader = new SimpleConsumer(broker, port, 1000,
1024*4, "getLeader");
// 创建一个主题元数据的信息请求
TopicMetadataRequest topicMetadataRequest =
new TopicMetadataRequest(Collections.singletonList(topic));
// 获取返回值
TopicMetadataResponse topicMetadataResponse = getLeader.send(topicMetadataRequest);
// 解析元数据返回值
List<TopicMetadata> topicsMetadata = topicMetadataResponse.topicsMetadata();
for (TopicMetadata topicMetadata : topicsMetadata) {
List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
for (PartitionMetadata partitionMetadata : partitionsMetadata) {
if (partition == partitionMetadata.partitionId()) {
return partitionMetadata.leader();
}
}
}
}
return null;
}
// 获取数据
private void run(List<String> brokers, int port, String topic, int partition, long offset) {
// 获取分区Leader
BrokerEndPoint leader = findLeader(brokers, port, topic, partition);
if (leader == null) {
return;
}
String leaderHost = leader.host();
// 创建获取数据的消费者
SimpleConsumer simpleConsumer = new SimpleConsumer(leaderHost, port, 1000,
1024 * 4, "getData");
// 创建获取数据的对象(可以获取多个数据.addFetch())
FetchRequest fetchRequest =
new FetchRequestBuilder().addFetch(topic, partition, offset, 1024 * 4).build();
// 获取返回值
FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);
// 解析返回值,创建获取数据的对象时可以多次.addFetch
ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
for (MessageAndOffset messageAndOffset : messageAndOffsets) {
// offset可以自行保存
long offset1 = messageAndOffset.offset();
ByteBuffer payload = messageAndOffset.message().payload();
// 自行反序列化
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(offset1 "-" new String(bytes));
}
}