Kafka使用分享

2019-02-26 11:50:22 浏览数 (1)

一、 日志系统背景

业务每天会产生大量日志,日志规模庞大,因为业务日志量大,滚动频繁,不可能永久保存,只能定时收集日志,将业务日志归集到一个中心,再做计算。对于实时收集的日志需要一个缓存队列来存储。

二、 为什么选择kafka

Kafka设计的初衷就是处理日志的,可以看做是一个日志系统,针对性很强。kafka有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持spark数据并行加载,对于像spark的一样的日志数据和实时流式分析系统,这是一个可行的解决方案。Kafka通过spark的并行加载机制来统一了在线和离线的消息处理。Apache Kafka是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

Kafka一些基本概念

  • Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
  • Partition:parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
  • Producer:负责发布消息到Kafka broker
  • Consumer:消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

Kafka架构

Kafka的设计

  1. 吞吐量:高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计 a.数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。 b. zero-copy:减少IO操作步骤 c. 数据批量发送 d. 数据压缩 e. Topic划分为多个partition,提高parallelism
  2. 负载均衡 a. producer根据用户指定的算法,将消息发送到指定的partition b. 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上 c. 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over d. 通过zookeeper管理broker与consumer的动态加入与离开
  3. 拉取系统:由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处: a. 简化kafka设计 b. consumer根据消费能力自主控制消息拉取速度 c. consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等
  4. 可扩展性:当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。

Kafka的设计要点

  1. 直接使用linux 文件系统的cache,来高效缓存数据。
  2. 采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次。根据测试结果,可以提高60%的数据发送性能。Zero-Copy详细的技术细节可以参考:
  3. 数据在磁盘上存取代价为O(1)。kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
  4. 显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。

三、 实践荆途

kafka应用小总结

  1. 配置优化 a. 在consumer端配置中有个”auto.offset.reset"配置项,有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,(高阶consumer存储在zookeeper中,低阶consumer需要代码自己存储及传参)在没有传入offset值时(比如新的groupId,或者是offset数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息。 b. 0.10版本advertised.host.name,如果非外网使用必须配置本机ip,否则需要配对应host。这个参数默认没有启用,默认是返回的java.net.InetAddress.getCanonicalHostName的值,在linux上这个值就是hostname的值,如果每个broker的hostname相同,客户端收到的所有broker 都是一样的hostname,会导致生产消费数据时解析不到服务器地址。 c. 要增加吞吐量就必须增加partition的数量,但是partition数量的增加,会增加kafka的系统集群负载 d. Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。 At most once 消息可能会丢,但绝不会重复传输;At least one 消息绝不会丢,但可能会重复传输,当网络有问题的时候会重复生产,曾经在这里踩大坑 e. KAFKA_HEAP_OPTS配置堆空间,在高并发的情况下,需要更高的堆空间,而后堆空间的数据转入pagecache,建议不超过4G,避免full gc的时候耗时太长,导致kafka不可用时间过长。该配置可以在kafka-server-start.sh的export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G” 修改 f. delete.topic.enable=true 配置删除topic自动清理数据,否则删除topic时只是标志topic是要被删除,kafka不会清理数据,需要手动在zookeeper及kafka服务器清理相应地topic数据 g.zookeeper.session.timeout.ms = 60000 配置kafka broker在zookeeper中session的过期时间。曾经配置过小导致broker被zookeeper判定为下线,导致节点不可用
  2. 压缩使用 a. kafka使用压缩,可选择snappy及zip,kafka支持可混用压缩及不压缩的数据,生产者和消费者代 码已经实现自动识别压缩类型,业务代码无需识别,但是要注意混用后容易出现数据错乱的情况导致消费者消费异常。Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU(初期使用的是千兆的机器)。 b. 不压缩的情况下,如果数据或集群出现问题,可以在kafka服务器,直接grep -a 关键字 日志目录/<topic>-/.log,将原始数据grep出来,如果是压缩的情况下,目前无解,只能眼睁睁看着数据取不出来。
  3. Zookeeper使用:zookeeper集群必须单独部署在干净的环境中,kafka 的 topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况都保存在zookeeper中,一旦zookeeper受影响,会导致kafka集群不可用的故障。zookeeper集群时,zookeeper要求半数以上的机器可用,zookeeper才能提供服务,所以集群必须有3台或以上的奇数台zookeeper服务器。
  4. 多副本问题 a. 多副本的情况下,生产者ack有4种配置,不需要ack,只需要leader 的ack,需要除leader之外其他数量副本的ack,需要所有副本的ack。 如果不需要ack,则会丢数据;如果只需要leader的ack,副本同步不及时,会有消费的offset,在服务器上不存在,导致消费失败;如果需要返回多个ack的情况,则会保证部分数据完整性,但是实时性会降低,生产的效率会变慢;如果需要所有副本ack,则数据实时性最低,可能对导致生产者生产效率底下,数据堆积。 b. 多副本情况下,因为非leader副本是通过消费leader副本来同步数据,并非生产者直接将数据send到非leader副本所在服务器上,所以开启多副本会导致kafka集群间的网络带宽翻倍。该情况需要评估好数据量是否需要上万兆机房,否则当带宽不够时,多副本等于鸡肋,会引起各种no leader partiton,offset失效等问题。
  5. 集群重启or修改问题 a. kafka topic创建后,不能随便修改该topic结构及每个partition的分布位置,因为修改操作会导致数据迁移,迁移过程大概率出现数据异常,可能破坏Message Key和Partition之间对应关系的风险,最后导致消费数据出现异常,会引发LeaderNotAvailableException,解决方法是数据生产到其他备用的kafka集群后,清空异常数据的kafka及zookeeper数据,重建集群即可。曾经在这里吃了不少苦头,浪费了几个星期时间,想放弃的念头都产生了。这种做法是网上资料都没有提到,官网也没有关于数据修复相关的资料,但是这是事实存在的数据无法修复的异常,是在QQ群交流后才产生的结论。 b. 保证每次停Broker时都可以Clean Shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。所以如果需要停止broker时,建议暂时使用备份集群,停止broker后,清除数据,再重建kafka。

总的来说kafka的高可用性设计虽然看起来很合理很可行,但实际使用上并非如此,对数据可用性比较高的场景,建议另外保留一份原始数据,防止kafka故障时带来的数据丢失。kafka兼容性,容错性等看起来也相当合理,但是在大量数据面前还是容易出问题,在这方面,建议使用常规用法,不要使用混用等非常规用法挑战kafka兼容性和容错性的用法,否则必踩大坑。

建议kafka使用原则

  1. topic只在创建时候配置参数,使用重建替代修改已创建的topic任何信息。
  2. 集群有问题、增加删除节点、修改配置等对集群的修改,用重建集群来替代。因为每次重启节点,都会引发数据迁移,数据量比较大的情况下,数据容易出现错乱异常。
  3. 永远都要有一个可用的备份kafka集群。
  4. 一个topic只用一种数据压缩类型,或者不压缩。

以上仅为个人在实践中对当前业务场景的一些做法和总结,可能会有使用姿势不对的问题及错误,请指正,欢迎拍砖指正

0 人点赞