Kafka - 3.x Producer 生产者最佳实践

2023-10-28 12:42:52 浏览数 (1)

生产经验_生产者提高吞吐量

核心参数

Code

代码语言:javascript复制
package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerParameters {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");

        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // batch.size:批次大小,默认16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // linger.ms:等待时间,默认0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // RecordAccumulator:缓冲区大小,默认32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        // compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i  ) {
            kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-"   i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

生产经验_数据可靠性

消息的发送流程

回顾下消息的发送流程如下:

ACK应答机制

背景

Kafka提供的解决方案

Leader收到数据,所有Follower开始同步数据,但有一个Follower因故障无法同步,导致Leader一直等待直到同步完成才发送ACK。

- Leader维护了一个动态的In-Sync Replica Set (ISR)和Leader保持同步的Follower集合。- 当ISR中的Follower完成数据同步后,Leader向Producer发送ACK。- 如果某个Follower长时间(replica.lag.time.max.ms)未向Leader同步数据,则该Follower将被移出ISR。- 在Leader发生故障时,将从ISR中选举新的Leader。


ack应答级别

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。 所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置

acks

描述

0

提供最低延迟,Leader副本接收消息后返回ack,尚未写入磁盘。可能导致数据丢失,特别是在Leader发生故障时。

1

Leader副本将消息写入磁盘后返回ack,但如果Leader在Follower副本同步数据之前发生故障,可能会丢失数据。

-1

或者 (all) ,Leader和所有Follower副本都将消息写入磁盘后才返回ack。如果在Follower副本同步完成后,Leader副本在发送ack之前发生故障,可能会导致数据重复。


应答机制 小结


Code

代码语言:javascript复制
package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerAck {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");

        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        // 重试次数retries,默认是int最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i  ) {
            kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-ack"   i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

生产经验_数据去重

数据传递语义


幂等性

幂等性原理
开启幂等性配置(默认开启)

在prudocer的配置对象中,添加参数enable.idempotence,参数值默认为true,设置为false就关闭了。


生产者事务

kafka事务原理
事务代码流程
代码语言:javascript复制
// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

生产经验_数据有序


生产经验_数据乱序

0 人点赞