TOC
记录下kafka生产者遇到的一些问题,主要基于0.8/0.9版本的producer api。
一、kafka简介
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统)。Kafka主要被用于两大类应用:1.在应用间构建实时的数据流通道;2.构建传输或处理数据流的实时流式应用。
链接:
- kafka官网
- github地址
【logo图】
二、生产者基本实现
1.示意图
2.具体实现:
2.1 Fire-and-forget模式
发送消息后不需要逻辑程序关心是否发送成功。这个是默认的写法,依赖producer api本身的高可用(配置相关参数后失败了也会重试),且默认就是高吞吐地异步发送。绝大部分情况下数据是会成功的,但是也会有失败的情况。
官方demo
代码语言:txt复制 public Producer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
public void run() {
int messageNo = 1;
while(true)
{
String messageStr = new String("Message_" messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo ;
}
}
优点:
- 写法简单,且producer api本身即提供一定的高可用
- 吞吐高,默认即异步发送
缺点:
- 当producer api本身的高可用不可靠时即会出现一些异常的情况,且程序本身很难捕获具体那条数据异常。
2.2 同步模式
参考代码:
代码语言:txt复制producer.send(record).get();
即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。
优点:
- 可以捕获每批发送记录的情况,并可以在业务层面做一些二次处理
缺点:
- 吞吐下降严重
2.3 callback模式
代码语言:txt复制class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
producer.send(record, new DemoProducerCallback());
通过callback机制,异步地触发式检查broker返回的结果,从而检查每次发送的结果。
要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。
callback性能损失不容小视,但其吞吐仍然远远大于同步的模式。
【性能对比测试】
总之,我们需要根据我们具体的业务场景实现我们的生产方式。
三、producer参数调优
1. acks
- acks=-1 强一致,不会丢数据。吞吐量会下降
- acks=0 发过去就完事了,不关心broker是否处理成功,可能丢数据。
- acks= 1 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
我们现网综合权衡成本效率下,默认使用的是acks= 1。
2. retries
顾名思义,当设置为大于零的值,客户端会重新发送任何发送失败的消息。主要问题:
- retries 不生效问题(大坑),主要是一些不可重试地异常,如“message size too large”(消息太大)
- 写入顺序的问题:重试会导致数据写入的无序
3. serializer.class
序列化用到的类。因为一些很复杂的业务问题,我们现网中以string为主。在某些特性开发中,会使用avro。
4. compression.codec
现网一般不压缩(生产机器cpu性能不太好),特性开发场景使用Snappy偏多。
5. batch.num.messages & queue.buffering.max.ms
影响吞吐实时性的两个指标。
- batch.num.messages:顾名思义,一批发送的消息数量
- queue.buffering.max.ms:队列中buff的最大时间数目(数据即使没到batch的量也会发送)
现网中需要详细地压测着两个指标,以达到吞吐和实时性之间的平衡。
四、分区问题
0.8版本的producer会存在要死broker分区的情况,导致kafka多分区之间数据不均匀的情况。
解决方法有两种:
- api端指定key: key对应分区不存在会出现异常
- 0.9版本api已经内置相关的算法,直接升级即可
五、其他问题
1. 生产幂等性
2. kafka生产高吞吐原理
六、kafka高可用生产的一种尝试
TBD
参考
《震惊了!原来这才是kafka!》
https://www.jianshu.com/p/d3e963ff8b70
《Kafka基础-生产者发送消息》
https://blog.csdn.net/gangchengzhong/article/details/80745974