记录前段时间使用Kafka的经历

2022-07-27 15:40:21 浏览数 (1)

  • 快速实现功能

需求背景就是实现用户行为分析系统的接入层服务,使用Kafka接收来自接入服务的消息。公司内提供了一套大数据组件工具,所以可以不用关注Kafka集群怎么搭建,都是界面上点点点的事情。但是本着学习的心态,还是照着官方文档走了一遍安装过程,这样可以在实际应用过程少一些困惑。

以快速搭建demo和尝试使用为目标,直接参考官方文档即可:

http://kafka.apache.org/quickstart

官网上的教程使用了kafka自带的ZooKeeper来管理集群信息,也可以轻松在网上找到以独立ZooKeeper来管理集群信息的配置方法,如果懒得找,也可以参考这个文章来配置:

https://www.w3cschool.cn/apache_kafka/apache_kafka_installation_steps.html

顺便贴一下生产者和消费者代码,以下代码足够测试自己搭建的集群是否正常运作,但是无法直接使用到生产环境:

生产者

代码语言:javascript复制
public class KafkaTestProducer {
    public static void main(String[] args) throws Exception{
        String topicName = "test_kafka";
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.0.20:6667");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        String str = "Test Hello Word ! ";
        for (int i = 0; i < 100; i  ){
            producer.send( new ProducerRecord<String, String>(topicName, Integer.toString(i) ,str) );
        }
        producer.close();
    }
}

消费者

代码语言:javascript复制
public static void main(String[] args) throws Exception{
        String topic = "test_kafka";
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.0.20:6668");
        props.put("group.id", "Group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        while( true ) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("offset = %d, key = %s, value= %s n",record.offset(), record.key(),record.value() ));
            }
        }
    }
  • 可用性强化测试

上面给到的代码无法直接应用于生产环境,以下给出一些测试场景,把其中会遇到的问题整理出来。为了方便测试,需要修改一下生产者和消费者的代码。

1)生产者的生产问题

生产者代码

生产者生产日志

生产者在生产第一条消息时,耗时159毫秒,其他消息生产耗时基本都是1毫秒内,这是因为生产者的send()方法是异步的,该方法线程安全,且不阻塞程序立即返回。这个特性带来了第一个问题:

【问题一】生产者如何立即感知Kafka服务的异常,并把消息存放到其他地方做容灾处理?

带着这个问题,把Kafka服务关闭,观察一下生产者的行为,发现关闭Broker后,生产者依然正常生产消息,无任何报错。也就是说,生产者应对Broker的状态是滞后的,而且默认情况下不会有任何反馈,这就需要开发人员针对数据发送是否成功做处理,生产者的send()方法提供一个回调函数,可以侦听到每条消息的发送状态,详细的代码样例可以直接参考官网中对Producer的API(文章后面部分继续描述这个问题的代码):http://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

保持Broker关闭的情况下,重启生产者进程,发现生产者挂住在send()函数的调用处,如下截图,这个现状的具体原因估计得看看send的源码,不过我理解是因为获取topic和topic分区等信息的时候,由于无法跟Broker服务通讯,所以程序挂住了,而这个过程估计是同步的。所以我们有第二个问题

【问题二】kafka集群的高可用性要如何架构?

2)消费者的消费问题

同生产者的做法,为了方便观察问题,添加了一些日志:

从消费日志看,消费者第一次获取消息队列时,是失败的,获取不到任何记录,第二次获取时才获取到记录。

做以下场景的测试:

1、保持生产者生产消息,重复关闭消费者和打开消费者,查看日志。

问题一:发现offset不连贯,也就是消费者消费的消息是从消费进程启动后开始计算的,不关闭消费进程才可以确保顺序消费。

2、关闭broker,查看日志。

问题二、消费者挂起在消费的poll环节,没有任何反应。来回重复尝试发现,broker在短时间内重启成功的话,消费者可以继续正常消费。Broker长时间之后再重启的话,消费者将再也无法正常消费。

深入去查资料,发现了一个已知的BUG,而且就在我当前使用的版本

https://cwiki.apache.org/confluence/display/KAFKA/KIP-266: Fix consumer indefinite blocking behavior

这个BUG提到,消费者的poll方法在当前版本存在超时参数不起作用的问题。需要在后续版本使用poll(Durationtimeout)来替换。

3)问题总结

上面提了4个问题,都是跟可用性相关的,这是因为接入服务最关注的指标之一就是可用性,首要任务是想尽办法不丢失日志。同时,测试过程其实很不严谨,主要是测试的时间点和写文章的时间点分开了,好多素材已经不好找回来,下次类似场景还得记录细致一点。

继续尝试把问题和解决思路说明白:

【问题一】生产者如何立即感知Kafka服务的异常,并把消息存放到其他地方做容灾处理?

针对这个问题,首先是去翻了一遍API,看了一遍回调方法的使用。

http://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

看起来只能利用回调函数处理,在数据提交完成后通过回调来判断是否把kafka通道标记为失败,然后转到本地文件临时存放。回调方法还有一个好处在于给失败的消息一次重处理的机会。

【问题二】kafka集群的高可用性要如何架构?

上面的测试代码其实只配置了一个broker,kafka集群本身的可用性问题可以通过增加机器的方式来实现。有几件事情可以做:1、增加对kafka服务的监控告警;2、给kafka集群部署2n 1台机器;3、topic的副本设置为3个或以上;

【问题三】发现offset消费不连贯

这个其实是个低级错误,没有提交offset导致服务器不知道消费组的历史消费点。多数人都不会犯这个错,但是意识到这个问题存在对后面提到的各个参数的配置方法会有思路上的帮助。

【问题四】broker关掉后,消费者挂起在消费的poll环节,没有任何反应

这个BUG在新版本的Kafka上已经得到解决,但是旧版的方法依然有问题。由于版本无法切换,所以我在poll函数外层包装了一个超时控制,超时后重新尝试建立新的kafka连接。

以上实践过程大约会花费两天时间,如果从生产到消费得全流程都得关注可用性的话,这个实践开销还是得确保的。经历了一些瞎折腾之后,可以阶段性地对Kafka的知识点做做收拢和总结了。

  • 阶段性知识整理

阅读

https://www.oschina.net/translate/kafka-design?lang=chs&p=1

https://www.cnblogs.com/sodawoods-blogs/p/8969774.html

https://blog.csdn.net/intelrain/article/details/80449501

重点消化

1、 Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。

2、 基于每个消费者保留的唯一元数据是该消费者在日志中的偏移或位置,存储在zoopkeeper中。

3、 日志中的分区有多种用途:首先,它们允许日志扩展到超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们充当了并行性的单位。

4、 无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

5、 文件缓存/直接内存映射

6、 对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换. 其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略。

7、 负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,有producer客户端决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.

8、 at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后"未处理"的消息将不能被fetch到,这就是"at most once".

at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态.

exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的.

  • 展望一:性能优化
  • 展望二:阅读源码

0 人点赞