Kafka

2022-10-25 15:59:55 浏览数 (2)

Kafka

  • 一、概述
  • 二、集群部署
  • 三、工作流程分析
    • 1. 生产过程
      • (1)写入方式(push)
      • (2)Partition
      • (3)Replication(副本)
      • (4)写入流程
    • 2. 存储过程
  • 四、API使用
    • 1. Producer
    • 2. Consumer

一、概述

  1. 消息队列 Kafka采用点对点模式,必须有监控队列轮询的进程在(耗资源),可以随时任意速度获取数据。 发布订阅模式:速度由消息队列推送决定,不用进程监控。

优点: (1)解耦 (2)冗余(备份) (3)扩展性 (4)灵活性、峰值处理能力 (5)可恢复性(冗余) (6)顺序保证(队列) (7)缓冲(冗余) (8)异步通信(宕机)

  1. Kafka 分布式消息队列,由LinkidIn公司开发,底层是Scala,先由Apache维护,kafka_
{scala-version}-

{kafka-version}.tgz。 Kafka对消息保存时根据Topic(保存)进行分类,发送消息者称为Producer(入口),消息接收者称为Consumer(出口),此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为Broker。一个分区(Partition)维护一个偏移量 (offset)。 Kafka集群和Consumer都依赖zookeeper集群保存一些meta信息。

  1. 架构

Leader为主,Follower为备,Kafka中的Follower不处理任何请求。 消费者组的不同消费者不能同时消费同一个分区的数据。

二、集群部署

代码语言:javascript复制
# 在解压的kafka目录下(推荐),创建一个存放日志(也存放数据)的信息
mkdir logs

# 修改server.properties
# 每一个实例的唯一辨识(int)
broker.id=0
# 是否可以删除topic
delete.topic.enable=true
# 设置日志打印的位置为创建的日志目录
log.dirs=/opt/kafka/logs
# 缓存数据的时间为7天、大小为1G
log.retention.hours=168
log.segment.bytes=1073741824
# 修改zk集群
zookeeper.connect=${id}:${port},${id}:${port},${id}:${port}

# 集群分发,修改broker.id
xsync kafka/
  1. 命令
代码语言:javascript复制
# bin/下的命令
kafka-server-start.sh
kafka-server-stop.sh
# 对topic的操作
kafka-topics.sh
# 测试时,控制台的消费
kafka-console-consumer.sh
kafka-console-producer.sh
代码语言:javascript复制
--topic  # 定义topic名
--replication-factor  # 定义副本数
--partitions  # 定义分区数
--daemon  # 后台启动
代码语言:javascript复制
# kafka依赖zk,先启动zk,并确定zk状态为leader/foller
zkstart.sh
zk/bin/zkServer.sh status

# 使用kafka自带的zk,-daemon表示后台启动
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

# 指定配置文件启动kafka,阻塞进程
bin/kafka-server-start.sh config/server.properties

# 查看进程
jps -l

# 创建分区数为x(kafka/logs下的一个目录),副本数为x(不能超过集群的节点数),名称为x的topic
bin/kafka-topics.sh --create --zookeeper ${id}:${port} --partitions ${number} --replication-factor ${number} --topic ${name}
# 查看topic数量,显示为topic name
bin/kafka-topics.sh --list --zookeeper ${id}:${port}
# 删除topic,能创建同名的即删除成功
bin/kafka-topics.sh --zookeeper ${id}:${port} --delete --topic ${name}

# 发送消息,连接kafka集群(kafka默认9092),写进topic
bin/kafka-console-producer.sh --broker-list ${id}:${port} --topic ${name}
# 消费消息,连接zk集群,读取topic,不加--from-beginning表示只获取最新的
bin/kafka-console-consumer.sh --zookeeper ${id}:${port} --from-beginning --topic ${name}

# 新版本的kafka中consumer的offset存储在本地,提升效率,不交由zk保存,会报警告!bootstrap(附属于)本地kafka集群,名为__consumer_offset的topic中。
bin/kafka-console-consumer.sh --bootstrap-server ${id}:${port} --from-beginning --topic ${name}

# 查看topic的详情,Isr为选举(其中某个与宕机的Leader节点数据最相近,作为新的Leader),ReplicationFactor为副本,值为broker.id
bin/kafka-topics.sh --zookeeper ${id}:${port} --describe-topic ${name}

三、工作流程分析

三大流程:生产、存储、消费

Kafka Cluster:Broker1、Broker2、Broker3 Producer创建Topic,指定分区数和副本数

代码语言:javascript复制
# 分区的好处:不同分区放在不同节点,实现了负载;消费者组只能消费不同分区数据,提高了并发度(topic的分区数应与一个消费者组中的消费者个数相同)
Broker1:Topic A-Partition 0-Leader、Topic A-Partition 1-Follower
                         Replication A/0                    Replication A/1
Broker2:Topic A-Partition 0-Follower、Topic A-Partition 1-Leader

Consumer Group:Consumer A、Consumer B

1. 生产过程

(1)写入方式(push)

Producer采用push模式将信息发布到Broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐量)。

(2)Partition

消息都被发送到一个topic,本质就是一个目录,而topic又由一些partition logs(分区日志,offset都从0开始,有序唯一,并不断追加)组成。

代码语言:javascript复制
分区的原因:
(1)方便在集群中扩展,每个partition可以通过调整以适应它所在的及其,而每个topic又可以由多个partition组成,因此整个集群就可以适应任意大小的数据;
(2)可以提高并发,同一个消费者组不能读取同一个分区的数据,因此可以以partition为单位读写。

分区的原则:
(1)指定了partition,则直接使用;
(2)未指定partition但指定key,通过对key的value进行hash出一个partition;
(3)partition和key都未指定,使用轮询选出一个partition。

(3)Replication(副本)

同一个partition可能有多个replication(server.properties配置中default.replication.factor=N),没有replication,一旦broker宕机,其上所有的数据都不可被消费,同时producer也不能将数据存于其上的partition。 引入replication,在需要时在其中选举出一个leader,producer和consumer只与这个leader交互,其他的作为follower从leader中复制数据。

(4)写入流程

ACK机制:0/1/all,1表示leader,all表示leader和follower均写入信息再继续接收。图为ACK为all的机制,防止数据丢失。

2. 存储过程

(1)存储方式 物理上把topic分为一个或多个partition,每个partition物理上对应一个文件夹,存储该partition的所有消息和索引文件。 实际存储数据的文件为logs/xxxx.log文件,存在序列化。

(2)存储策略 无论消息是否被消费,kafka都会保留所有的消息,删除方式有两种: 基于时间(log.retention.hours=168)和大小(log.retention.bytes=1073741824)。

代码语言:javascript复制
kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,删除过期文件与提高kafka性能无关。

(3)zk存储机制

Consumer存储的是偏移量(低版本kafka),Producer不在zk注册,Brokers也存储在zk。

## 3. 消费过程 Kafka提供了两套consumer API:高级Comsumer API和低级Consumer API。

(1)高级API 不能管理offset,书写简单,系统通过zk自行管理; 不能管理分区、副本等,系统自动管理(默认1分钟更新zk中保存的offset )。 可以使用group来区分对同一个topic的不同程序访问分离开俩。

(2)低级API 能够开发者控制offset,随机读取; 书写复杂,需要自行控制offset,连接分区,找到leader等。

(3)消费者组 consumer.properties中group.id=group0,设置消费者组名。启动消费者时,需要添加命令--consumer.config config/consumer.properties

四、API使用

1. Producer

Maven依赖:

代码语言:javascript复制
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>${kafka.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>${kafka.version}</version>
</dependency>

ProducerConfig类下包含所有的配置参数,以及doc参考文档。

代码语言:javascript复制
1.高级API:带或不带回调函数的生产者
public class Producer {
    public static void main(String[] args) {
        // 设置配置文件
        Properties props = new Properties();
        // Kafka集群
        props.put("bootstrap.servers", "localhost:9092");
        // 应答级别,all可以写出-1
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 0);
        // 批量大小和提交延迟,决定发送时刻
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        // 缓存
        props.put("buffer.memory", 33554432);
        // KV序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送数据,第二个参数可以加上回调函数,重写onCompletion(RecordMetadata, exception)函数
        producer.send(new ProducerRecord<>("first", String.valueOf(123)), (metadata, exception) -> {
            if (exception == null) {
                System.out.println(metadata.partition()   "-"   metadata.offset());
            } else {
                System.out.println("ERROR:"   exception);
            }
        });

        producer.close();
    }
}


// 2.低级API:自定义分区的生产者
// 实现Partition类,重写方法partition、close、configure,配置文件需要匹配生产者
props.put("partitioner.class", "${全类名}");

注意:分区中所有偏移数据消费掉,再消费下一个分区,可能会出现消费数据的顺序和生产的顺序不同。

2. Consumer

代码语言:javascript复制
// 1.高级API
public class Consumer {
    public static void main(String[] args) {
        // 配置信息
        Properties props = new Properties();
        // kafka集群
        props.put("bootstrap.servers", "localhost:9092");
        // 消费者组id
        props.put("group.id", "test");
        // 设置自动提交offset
        props.put("enable.auto.commit", "true");
        // 提交延迟
        props.put("auto.commit.interval.ms", "1000");
        // KV的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 指定Topic
        consumer.subscribe(Arrays.asList("first", "second"));

        // 获取数据结束,JVM自动退出
        while (true) {
        
            // 获取数据,参数为获取延迟
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            // 打印数据
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.println(record.topic()   "-"   record.partition()   "-"   record.value());
            }
        }
    }
}


// 2.低级API:读取指定topic、partition(找leader)、offset的数据
/** 
 * 主要步骤:
 * (1*)findLeader(),根据指定的分区从主题元数据中找到主副本;
 * (2)getLastOffset(),获取分区最新的消费进度;
 * (3*)run(),从主副本拉取分区的消息;
 * (4)findNewLeader(),识别主副本的变化,重试。
 **/
 // 找分区leader(元数据信息)
private BrokerEndPoint findLeader(List<String> brokers, int port, String topic, int partition) {
    for (String broker : brokers) {
        // 创建获取分区Leader的消费者对象,链接到具体某一个节点
        SimpleConsumer getLeader = new SimpleConsumer(broker, port, 1000,
                1024*4, "getLeader");

        // 创建一个主题元数据的信息请求
        TopicMetadataRequest topicMetadataRequest =
                new TopicMetadataRequest(Collections.singletonList(topic));

        // 获取返回值
        TopicMetadataResponse topicMetadataResponse = getLeader.send(topicMetadataRequest);

        // 解析元数据返回值
        List<TopicMetadata> topicsMetadata = topicMetadataResponse.topicsMetadata();
        for (TopicMetadata topicMetadata : topicsMetadata) {
            List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
            for (PartitionMetadata partitionMetadata : partitionsMetadata) {
                if (partition == partitionMetadata.partitionId()) {
                    return partitionMetadata.leader();
                }
            }
        }
    }
    return null;
}

// 获取数据
private void run(List<String> brokers, int port, String topic, int partition, long offset) {
    // 获取分区Leader
    BrokerEndPoint leader = findLeader(brokers, port, topic, partition);
    if (leader == null) {
        return;
    }
    String leaderHost = leader.host();

    // 创建获取数据的消费者
    SimpleConsumer simpleConsumer = new SimpleConsumer(leaderHost, port, 1000,
            1024 * 4, "getData");

    // 创建获取数据的对象(可以获取多个数据.addFetch())
    FetchRequest fetchRequest =
            new FetchRequestBuilder().addFetch(topic, partition, offset, 1024 * 4).build();

    // 获取返回值
    FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);

    // 解析返回值,创建获取数据的对象时可以多次.addFetch
    ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
    for (MessageAndOffset messageAndOffset : messageAndOffsets) {
        // offset可以自行保存
        long offset1 = messageAndOffset.offset();
        ByteBuffer payload = messageAndOffset.message().payload();

        // 自行反序列化
        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);

        System.out.println(offset1   "-"   new String(bytes));
    }
}

0 人点赞