从编程角度而言,生产者就是负责向Kafka发送消息的应用程序。本文使用java语言做详细介绍。
一个正常的生产逻辑需要以下几个步骤:
- 配置生产者客户端参数及创建相应的生产者实例。
- 构建待发送的消息。
- 发送消息。
- 关闭生产者实例。
客户端开发案例
本文先提供简单的生产者客户端程序,然后做具体的改进和分析。
代码语言:javascript复制<--导入依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
public class Producer {
//链接地址
public static final String brokerList = "192.168.0.18:9092";
//主题
public static final String topic = "topic-demo";
//配置参数
public static Properties initConfig(){
Properties properties = new Properties();
properties.put("bootstrap.servers",brokerList);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer" );
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("client.id","producer.client.id.demo");
return properties;
}
public static void main(String[] args) throws InterruptedException {
//构建发送主体
KafkaProducer<String, String> producer = new KafkaProducer<>(initConfig());
//构建消息体
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello world");
try {
//发送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
构建的消息对象ProducerRecord并不是单纯意义上的消息,它包含了多个属性,原本需要发送的业务相关的消息体只是其中的一个value属性,比如“hello world”,ProducerRecord的源码如下:
代码语言:javascript复制public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
//省略构造方法和getter、setter方法
其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,Kafka0.11x版本才引入这个属性,它大多用来设定一些与应用相关的信息,也可以不设置。key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中(事实上不总是这样,后面会解释)。有key的消息还可以支持日志压缩功能(以后讲压缩)。value是消息体,一般不为空,如果为空则表示特定的消息——墓碑消息。temestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者标识消息创建的时间,后者表示消息追加到日志文件的时间。
必要的参数配置
参考initConfig方法,在创建真正的生产者实例前需要配置相应的参数,比如需要链接的kafka集群地址。通常有3个参数是必填的。
- bootstrap.server:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式是host1:port1,host2:port2,可以设置一个或者多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的broker地址,因为生产者会从给定的borker里查找到其他broker的信息。不过建议至少设置两个以上的borker地址信息,当其中一个宕机时,生产者仍然可以链接到集群上。
- key.serializer和value.serializer :broker端接收的消息必须以字节数组(byte[])的形式存在。在代码中使用的
KafkaProducer<String, String>
和ProducerRecord<String, String>
中的泛型<String,String>
对应的就是小溪中key和value的类型,生产者客户端使用这种方式可以让代码具有更好的可读性,不过在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。key.serializer和value.serializer这两个参数分别用来指定key和value的序列化器,这两个参数无默认值。后面讲如何自定义序列化器。
方法里还设置了一个参数client.id,这个参数用来设定KafkaProducer对应的客户端id,默认值为“”。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”,即字符串“producer-”与数字的拼接。
KafkaProducer中的参数众多,远非实例方法中的那样只有4个。一般而言,开发人员无法记住所有的参数名,只能有个大概的印象。在实际使用过程中,诸如key.serializer
之类的字符串经常由于认为因素而书写错误。为此我们可以使用ProducerConfig类来做一定程度上的预防措施,每个参数在这个类上都有对应的名字。如下图所示:
ProducerConfig
我们将initConfig方法做如下修改:
代码语言:javascript复制 public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
return properties;
}
注意到上面的代码中key和value对应的序列化器名字也容易写错,这里通过java的技巧来做进一步修改:
代码语言:javascript复制 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer是线程安全的,可以再多个线程中共享单个实例,也可以将KafkaProducer实例进行池化。
KafkaProducer中有多个构造函数,假如在创建实例过程中没有指定key.serializer和value.serializer这两个参数的话,实例就得这么构建:
代码语言:javascript复制KafkaProducer<String, String> producer
= new KafkaProducer<>(initConfig(),new StringSerializer(),new StringSerializer());
小编不会这么做,一般都是在initConfig函数里指定所有的参数。
消息的发送(同步、异步、回调)
ProducerRecord是消息的载体。topic属性和value属性是必填的,其余属性是选填的,其构造方法也有很多:
代码语言:javascript复制 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, (Iterable)null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, (Long)null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, V value) {
this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}
实际应用开发过程中,创建ProducerRecord对象是一个非常频繁的动作。创建完消息体后,就可以开始发送消息了。消息的发送主要有三种模式
- 发后即忘(fire-and-forget)
- 同步(sync)
- 异步(async) 案例中的发送方式就是发后即忘,它只管往kafka中发送消息而不关心消息是否正确送达。在大多数情况下,这种发送方式没什么问题,不过在某些时候(比如发生不可重试异常)会造成消息丢失。这种发送方式性能最高,可靠性也最差。
send方法返回的并非是void类型,而是Future<RecordMetadata>
类型,send()方法有两个重载方法:
Future<RecordMetadata> send(ProducerRecord<K, V> var1);
Future<RecordMetadata> send(ProducerRecord<K, V> var1, Callback var2);
要实现同步可以利用返回的Future对象实现,如下所示:
代码语言:javascript复制 try {
producer.send(record).get();//实现同步发送
} catch (InterruptedException |ExecutionException e) {
e.printStackTrace();
}
实际上,send方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。案例中,send方法之后直接链式调用了get()方法来阻塞等待Kafka的响应,知道消息发送成功或发生异常。如果发生异常,那么就需要捕获异常并交由逻辑处理层。
也可在调用send方法之后不直接调用get方法,比如下面的一种实现同步的方式:
代码语言:javascript复制 Future<RecordMetadata> retu = producer.send(record);//send方法本身是异步的
RecordMetadata recordMetadata = null;
try {
recordMetadata = retu.get();//实现同步发送
System.out.println("同步发送成功到:" recordMetadata.topic());
} catch (InterruptedException |ExecutionException e) {
e.printStackTrace();
}
这样可以获取一个RecordMetadata对象,它包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量、时间戳等。如果你需要这些信息,则可以使用这个方式。如果不需要直接使用producer.send(record).get();
方式更省事。
Future表示一个任务的声明周期,并提供了响应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。也可以使用retu.get(3, TimeUnit.SECONDS);
方式来实现超时阻塞。
KafkaProducer一般会发生两种类型的异常:可重试异常和不可重试异常。常见的可重试异常包括:NetworkException
、LerderNotAvailableException
、UnknownTopicOrPartitionException
、NotEnoughReplicasException
、NotCoordinatorException
等。比如,NetworkException
表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LerderNotAvailableException
表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的leader副本选举完成之前,重试之后可以重新恢复。不可重试的异常,如RecordTooLargeException
,暗示了所发送的消息太大,对此不会进行任何重试,直接抛出异常。
对于可重试的异常,如果配置了retries参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries的默认值为0,配置也很简单:
代码语言:javascript复制properties.put(ProducerConfig.RETRIES_CONFIG,10);
如果重试10次后还没有恢复,那么仍然会抛出异常,进而发送的外层逻辑处理就要处理这些异常了。
同步方式可靠性高,要么消息发送成功,要么发生异常,如果发生异常则可以捕获并进行相应的处理,不会造成消息丢失。不过性能确实差很多,需要阻塞等待一条消息发送完成后再继续发送下一条消息。
来了解一下异步发送方式,一般是在send方法里指定一个Callback回调函数,Kafka在返回响应时调用该函数来实现异步发送确认。Kafka有响应时就会回调,要么发送成功,要么抛出异常。如:
代码语言:javascript复制 //异步发送
public static void sendAsyn(ProducerRecord<String, String> record,KafkaProducer<String, String> producer){
//异步发送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
e.printStackTrace();
}else{
System.out.println("异步发送成功" recordMetadata.topic() ":" recordMetadata.partition());
}
}
});
}
onComplete方法的两个参数是互斥的,消息发送成功时,meta不为null而exception为null;而消息发送异常时,metadata为null而exception不为null。
对于同一个分区而言,如果消息record1于record2之前发送,系统可以保证对应的callback1在callback2之前调用,也就是说回调函数的调用也是可以保证分区有序。
最后producer.close();
方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer,同时,还提供了一个带超时的close方法,这个很少用。
序列化
生产者需要用序列化器把对象转换成字节数组才能发给kafka。消费者必须用反序列器把从kafka收到的字节数组转换成相应的对象。上文讲的序列化器StringSerializer
实现了org.apache.kafka.common.serialization.Serializer
接口,此外还有
ByteArray
、ByteBuffer
、Bytes
、Double
、Integer
、Long
等序列化器,都实现了Serializer
接口,该接口有3个方法:
void configure(Map<String, ?> var1, boolean var2);
byte[] serialize(String var1, T var2);
void close();
configure
用来配置当前类,serialize
方法用来执行序列化操作。而close方法用来关闭当前的序列化器,一般情况下close是个空方法,如果实现了此方法,必须保证此方法的幂等性,因为KafkaProducer可能会调用多次该方法。
我们先来看一下StringSerializer的源码,从而引出自定义序列化器的编写
代码语言:javascript复制public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
public StringSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}
if (encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}
}
public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " this.encoding);
}
}
public void close() {
}
}
首先是configure()方法,是在创建KafkaProducer实例的时候调用的,主要用来确定以编码类型,不过一般客户端对于key.serializer.encoding和value.serializer.encodeing这几个参数是不会设置的,默认为UTF-8。serialize()方法非常直观,就是将String类型转换为byte[]类型。
如果Kafka客户端提供的几种序列化器都无法满足你,则可以使用Avro/JSON/Thrift/ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。下面看如何自定义:
首先创建一个业务类:
代码语言:javascript复制public class User {
private String name;
private int age = -1;
public String getName() {
return name;
}
public User setName(String name) {
this.name = name;
return this;
}
public int getAge() {
return age;
}
public User setAge(int age) {
this.age = age;return this;
}
}
定义序列化器
代码语言:javascript复制package serializer;
import bean.User;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
//自定义序列化器
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, User user) {
if (user == null) {
return null;
}
byte[] name;
int age = user.getAge();
try {
if (user.getName() != null) {
name = user.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
//数组总共的长度
ByteBuffer byteBuffer = ByteBuffer.allocate(4 4 name.length);
//name字节数
byteBuffer.putInt(name.length);
//放name字节数组
byteBuffer.put(name);
//放age,age本身就是int类型的
byteBuffer.putInt(age);
return byteBuffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
}
}
关于ByteBuffer怎么用,可以参考笔者的《「高并发通信框架Netty4 源码解读(四)」NIO缓冲区之字节缓冲区ByteBuffer详解》
定义消费端的反序列化器
代码语言:javascript复制public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public User deserialize(String s, byte[] bytes) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
int nameLength = byteBuffer.getInt();
byte name[] = new byte[nameLength];
byteBuffer.get(name,0,nameLength);
int age = byteBuffer.getInt();
return new User().setAge(age).setName(new String(name));
}
@Override
public void close() {
}
}
更改序列化器:
代码语言:javascript复制properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
分区器
消息在发送过程中,有可能需要经过拦截器、序列化器和分区器的一系列作用之后才能被真正发往broker。拦截器不是必须的,后面讲。序列化器是必须的,消息经过序列化后就需要确定它发往的分区,如果ProducerRecord消息中指定了partition字段那么就不需要分区器的作用了,因为partition代表的就是所要发往的分区号。
如果消息中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是DefaultPartitioner
,它实现了Partitioner
这个接口,定义的方法如下:
public interface Partitioner extends Configurable, Closeable {
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
void close();
}
其中,parition方法用来计算分区号,返回值为int类型。partition方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现丰富的分区器。
在默认的分区器DefaultPartitioner
实现中,如果key不为null,那么默认的分区器会对key进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同的key的消息会被写入同一个分区,如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区内,在不改变主题分区的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了新的分区,映射就破坏了。
指定分区器的方式:
代码语言:javascript复制properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());//指定分区器,配合key使用
拦截器
生产者可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容,也可以用来在发送回调逻辑前做一些定制化要求,比如统计类工作。
生产者拦截器的使用很简单,主要自定义实现ProducerInterceptor接口
代码语言:javascript复制public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
KafkaProducer在将消息序列化和计算分区之前会调用拦截器的onSend方法来对消息进行相应的定制化操作。在消息被应答之前或消息发送失败时调用生产者拦截器的onAcknowledgement方法,优先于用户设定的Callback之前执行。这个方法运行在producer的IO线程中,所以这个方法的实现越简单越好,否则影响消息的发送速度。
自定义拦截器实现:
代码语言:javascript复制public class ProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String ,String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord onSend(ProducerRecord<String ,String> producerRecord) {
//消息发送前,进行修改操作
String modifieldValue = "prefix-" producerRecord.value();
return new ProducerRecord<String ,String>(producerRecord.topic(),
producerRecord.partition(),
producerRecord.timestamp(),
producerRecord.key(),
modifieldValue,
producerRecord.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if(e == null){
sendSuccess ;
System.out.println("发送成功的消息:" sendSuccess);
}else{
sendFailure ;
System.out.println("发送失败的消息:" sendFailure);
}
}
@Override
public void close() {
System.out.println("发送成功率:" (double)sendSuccess/(sendSuccess sendFailure));
}
@Override
public void configure(Map<String, ?> map) {
}
}
我们在onSend方法上修改了内容,发送内容前加上了prefix-前缀,onAcknowledgement用来统计发送成功与失败的消息数。
拦截器的配置:
代码语言:javascript复制properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class.getName() "," ProducerInterceptor2.class.getName());
多个拦截器用逗号分隔,拦截器的调用顺序会按配置顺序调用。
小总结:生产者调用顺序 :拦截器->序列化器->分区器
原理剖析
客户端整体架构:
客户端整体架构
整个生产者客户端由两个线程协调运行,分别是主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也成为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过客户端参数buffer.memory
配置,默认值为33554432B,即32MB。
//设置缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
如果生产者发送消息的速度超过发送服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛异常,这个取决于参数max.block.ms
的配置,此参数的默认值为60000,即60秒。
//设置阻塞异常的
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");//3s
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列中,在RecordAccumulator内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一个至多个ProducerRecord。通俗的说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求次数以提升整体的吞吐量。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ButeBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗资源的,在RecordAccumulator的内部还有一个BufferPool,它主要实现ByteBuffer的复用,以实现缓存的高效利用。不过,ByteBuffer只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool,这个特定的大小由batch.size参数来指定,默认为16384B,即16KB,我们可以适当调大batch.size参数以便多缓存一些消息。
ProducerBatch的大小和batch.size参数也有密切的关系。当一条消息(ProducerRecord)流入到RecordAccumulator后时,会先寻找与消息分区对应的双端队列,再从这个双端队列尾部获取一个ProducerBatch并查看是否还可以写入ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以参数的大小来创建ProducerBatch,这样在使用完这段内存区域后,可以通过BufferPool来管理进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
Sender从RecordAccumulator获取缓存的消息后,会进一步量原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatc>>的形式,其中,node表示kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立连接的,也就是向具体的broker节点发送消息,而不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪一个分区中发送消息,所以在这里需要做一个应用逻辑层面到网络IO层面的转换。
转变成<Node,List<ProducerBatc>>的形式后,sender进一步封装成<Node,Request>的形式,这样就可以发往各个Node了,这里的Request是Kafka的各种协议请求。
请求在发往Kafka前,还会保存在InFlightRequests中,它保存对象的具体形式为Map<NodedId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求,NodedId是一个String类型,表示节点的ID编号。
与此同时InFlightRequests还提供了许多管理类的方法,并且通过参数配置还可以限制每个链接(也就是客户端与Node之间的链接)最多缓存的请求数,这个配置参数为max.in.flight.requests.per.connection
,默认值为5,即每个链接最多只能缓存5个未响应的请求,超过该数值后就不能向这个连接发送更多的请求了,除非缓存中有请求收到了响应。通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或者网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据的更新
KafkaProducer要将消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或直接指定)目标分区,之后需要知道目标分区的leader副本所在的broker节点的地址、端口号等信息才能建立连接,最终才能将消息发送到Kafka,在这过程中需要的信息都属于元数据信息。
案例中,我们了解到bootstrap.servers参数只需要配置部分broker节点的地址即可, 不需要配置所有broker节点的地址,因为客户端自己可以发现其他broker的节点地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态的捕获这些变化。
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪个节点上,哪些副本在AR、ISR等集合中,集群中有哪些数据,控制器节点又有哪一个等信息。
当客户端没有需要使用的元数据时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。该参数的默认值为300000,即5分钟 。元数据的更新操作是在客户端的内部进行的,对客户端的外部使用者不可见。
代码语言:javascript复制//客户端更新kafka集群元数据的时间间隔,默认5分钟
properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG,300000);
重要的生产者参数
在KafkaProducer中,大部分参数都有合理的默认值,一般不需要修改它们,不过了解这些参数可以让我们更合理的使用生产者客户端,其中还有一些参数涉及程序的可用性和性能。
acks
这个参数 指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息时成功写入的,acks是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的平衡。acks参数有3种类型的值(都是字符串类型):
- acks = 1 默认值就为1。生产者发送消息之后,只要leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader副本崩溃、重新选举新的leader副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为重新选举的leader副本中并没有这条对应的消息。acks=1是消息可靠性和吞吐量之间的折中方案。
- acks=0 生产者发送消息后不需要等待任何服务端的响应。在其他配置环境下,acks=0可达到最大吞吐量。
- acks=-1或acks=all 生产者在发送消息后,需要等到所有ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境下相同的情况下,可以达到最强的可靠性。但这不意味着消息一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况,要获得最强的消息可靠性要配合min.insync.replicas等参数的联动配合。
//leader副本成功收到消息后返回响应,不管follower副本
properties.put(ProducerConfig.ACKS_CONFIG,"1");
max.request.size
这个参数用来限制生产者客户端能发送消息的最大值,默认为1048576B,即1MB。一般情况下这个默认值就可以满足大多数的应用场景了。笔者不建议盲目的增大这个参数值,尤其是对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及其他一些参数的联动,比如broker端的message.max.bytes参数,如果配置错误会引起一些不必要的异常。
retries 和 retry.backoff.ms
retries用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者应用程序。如果重试达到设定的次数,那么生产者就会放弃重试返回异常。不过不是所有的异常都是可以通过重试来解决的,如消息太大,超过max.request.size参数配置的时候,这种方式就不行了。
重试还和另外一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
Kafka可以保证同一个分区中的消息是有序的。如果生产者按照一定顺序发送消息,那么这些消息也会顺讯的写入分区,进而消费者也可以按照顺序消费。如果将acks参数配置为非零值,并且max.in.flight.request.per.connection参数配置大于1的值,那么就会出现错序的现象:如果第一批消息写入失败,而第二批消息写入成功,那么生产者会重试发送第一批次的消息,此时第一批次的消息写入成功,那么这两批消息的顺序就会发生错序。一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样会影响整体的吞吐。
compression.type
这个参数用来指定消息的压缩方式,默认为none,默认情况下不会压缩消息。该参数还可以配置“gzip”/"snappy"和“lz4”。对消息进行压缩可以极大较少网络传输量、降低网络IO,从而提高整体性能。消息压缩是一种使用时间换空间的优化方式。如果对时延有一定的要求,则不推荐对消息进行压缩。
connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
linger.ms
这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0。生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。
receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统默认值。如果Producer与Kafka处于不同的机房,则可以适当调大这个参数。
send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区(SO_RECBUF)的大小,默认值为131072(B),即128KB。与receiver.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms
这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
还有一些参数没有提及,这些参数同样非常重要,他们需要单独的文章或场景来描述。