文章目录
-
-
- 1. 综述
- 2. 消息队列(Message Queue)
-
- 2.1 点对点
- 2.2 发布/订阅(pub-sub)
- 3. Kafka基础术语解释
-
- 3.1 Broker
- 3.2 Partitions
- 3.3 Message
- 4. Kafka持久化
- 5. Kafka 作为消息/存储系统及流处理
-
- 5.1 消息系统
- 5.2 存储系统
- 5.3 流处理
- 6. 常用配置项
-
- 6.1 Kafka实例配置
-
- 6.1.1 broker配置
- 6.1.2 topic配置
- 6.2 Producer端配置
-
- 优先级高
- 优先级中
- 优先级低
- 6.3 Consumer端配置
- Ref
-
1. 综述
Apache Kafka是基于发布/订阅的容错消息系统,由Scala和Java编写,是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。
与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。
Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
2. 消息队列(Message Queue)
Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以使用一个或多个代理实例。消息队列分为两种:点对点与发布/订阅(pub-sub)
2.1 点对点
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。
2.2 发布/订阅(pub-sub)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
现实生活的例子是电视,它发布不同的频道,如运动,电影,音乐等,任何人都可以订阅自己的频道集。
3. Kafka基础术语解释
- 生产者Producer: 是消息的产生的源头,负责生成消息并发送到Kafka服务器上。
- 消费者Consumer: 消息的使用方,负责消费Kafka服务器上的消息。
- 主题Topic: 由用户定义并配置在Kafka服务器,用于建立生产者和消息者之间的订阅关系:生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
- 分区Partition: 一个Topic下面会分为很多分区,例如:“kafka-test”这个Topic下可以分为6个分区,分别由两台服务器提供,那么通常可以配置为让每台服务器提供3个分区,假如服务器ID分别为0、1,则所有的分区为0-0、0-1、0-2和1-0、1-1、1-2。
Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)
- Broker: 即Kafka的服务器,用户存储消息,Kafka集群中的一台或多台服务器统称为 broker。
- 消费者分组Group: 用于归组同类消费者,在Kafka中,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
- 偏移量Offset: 消息存储在Kafka的Broker上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量,这个偏移量就是所谓的Offset。
- 领导者Leader: 负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader。
- 追随者Follower: 跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。
3.1 Broker
- Message在Broker中通Log追加的方式进行持久化存储。并进行分区(patitions)。
- 为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
- Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。Message消息是有多份的。
- Broker不保存订阅者的状态,由订阅者自己保存。
- 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
- 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。
3.2 Partitions
- Kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存。
- 可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。
- 越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。
3.3 Message
- Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
- Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
- partition中的每条Message包含了以下三个属性:
- offset: 消息唯一标识, 对应类型long
代码语言:txt复制- MessageSize: 对应类型int32
代码语言:txt复制- data: message的具体内容。
4. Kafka持久化
- 一个Topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中。
- Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。
- 为数据文件建索引:稀疏存储,每隔一定字节的数据建立一条索引。下图为一个partition的索引示意图:
5. Kafka 作为消息/存储系统及流处理
5.1 消息系统
kafka有比传统的消息系统更强的顺序保证。
传统的消息系统按顺序保存数据,如果多个消费者从队列消费,则服务器按存储的顺序发送消息,但是,尽管服务器按顺序发送,消息异步传递到消费者,因此消息可能乱序到达消费者。这意味着消息存在并行消费的情况,顺序就无法保证。消息系统常常通过仅设1个消费者来解决这个问题,但是这意味着没用到并行处理。
kafka做的更好。通过并行topic的parition —— kafka提供了顺序保证和负载均衡。每个partition仅由同一个消费者组中的一个消费者消费到。并确保消费者是该partition的唯一消费者,并按顺序消费数据。每个topic有多个分区,则需要对多个消费者做负载均衡,但请注意,相同的消费者组中不能有比分区更多的消费者,否则多出的消费者一直处于空等待,不会收到消息。
5.2 存储系统
所有发布消息到消息队列和消费分离的系统,实际上都充当了一个存储系统(发布的消息先存储起来)。Kafka比别的系统的优势是它是一个非常高性能的存储系统。
写入到kafka的数据将写到磁盘并复制到集群中保证容错性。并允许生产者等待消息应答,直到消息完全写入。
kafka的磁盘结构 - 无论你服务器上有50KB或50TB,执行是相同的。
client来控制读取数据的位置。你还可以认为kafka是一种专用于高性能,低延迟,提交日志存储,复制,和传播特殊用途的分布式文件系统。
以下是详细说明:可以认为topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。
在目录/{topicName}-{partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:
为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。
- baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。
- position:在segment中的绝对位置。
查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。
5.3 流处理
在kafka中,流处理持续获取输入topic
的数据,进行处理加工,然后写入输出topic
。
可以直接使用producer和consumer API进行简单的处理。对于复杂的转换,Kafka提供了更强大的Streams API。可构建聚合计算或连接流到一起的复杂应用程序。
助于解决此类应用面临的硬性问题:处理无序的数据,代码更改的再处理,执行状态计算等。
前面的博客Spark Structured Streaming Kafka使用笔记有详细介绍Spark Kafka的使用。
6. 常用配置项
6.1 Kafka实例配置
6.1.1 broker配置
配置项 | 作用 |
---|---|
broker.id | broker的唯一标识 |
auto.create.topics.auto | 设置成true,就是遇到没有的topic自动创建topic。 |
log.dirs | log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。 |
6.1.2 topic配置
配置项 | 作用 |
---|---|
num.partitions | 新建一个topic,会有几个partition。 |
log.retention.ms | 对应的还有minutes,hours的单位。日志保留时间,因为删除是文件维度而不是消息维度,看的是日志文件的mtime。 |
log.retention.bytes | partion最大的容量,超过就清理老的。注意这个是partion维度,就是说如果你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。 |
log.segment.bytes | 一个segment的大小。超过了就滚动。 |
log.segment.ms | 一个segment的打开时间,超过了就滚动。 |
message.max.bytes | message最大多大 |
6.2 Producer端配置
优先级高
- bootstrap.servers 一组host和port用于初始化连接. 不管这里配置了多少台server, 都只是用作发现整个集群全部server信息. 这个配置不需要包含集群所有的机器信息. 但是最好多于一个, 以防服务器挂掉.
- key.serializer
用来序列化key的
Serializer
接口的实现类. - value.serializer
用来序列化value的
Serializer
接口的实现类 - acks producer希望leader返回的用于确认请求完成的确认数量. 可选值 all, -1, 0 1. 默认值为1
- `acks=0` 不需要等待服务器的确认. 这是`retries`设置无效. 响应里来自服务端的offset总是-1. producer只管发不管发送成功与否。延迟低,容易丢失数据。
- `acks=1` 表示leader写入成功(但是并没有刷新到磁盘)后即向producer响应。延迟中等,一旦leader副本挂了,就会丢失数据。
- `acks=all`等待数据完成副本的复制, 等同于`-1`. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置`unclean.leader.election.enable`为true, 保证当ISR列表为空时, 选择其他存活的副本作为新的leader.buffer.memory
producer可以使用的最大内存来缓存等待发送到server端的消息. 如果消息速度大于producer交付到server端的阻塞时间
- compression.type
producer压缩数据的类型, 默认为none, 就是不压缩. 可选
none
,gzip
,snappy
和lz4
. 压缩整个batch的数据, 因此batch的效果对压缩率也有影响. 更多的批处理意味着更好的压缩 - retries
设置大于零的值将导致客户端重新发送其发送失败并发生潜在的瞬时错误的记录. 相当于client在发送失败的时候会重新发行. 如果设置了
retries
而没有将max.in.flight.request.per.connection
设置为1, 在两个batch发送到同一个partition时有可能打乱消息的发送顺序(第一个发送失败, 而第二个发送成功)
优先级中
- batch.size producer会尝试批量发送属于同一个partition的消息以减少请求的数量. 这样可以提升客户端和服务端的性能. 默认大小是16348 byte (16k). 发送到broker的请求可以包含多个batch, 每个batch的数据属于同一个partition. 太小的batch会降低吞吐. 太大会浪费内存.
- client.id 发送请求时传递给服务端的id字符. 用来追溯请求源, 除了使用ip/port. 服务端的请求日志中会包含一个合理的应用名. 默认为空
- linger.ms
在正常负载的情况下, 要想减少请求的数量. 加上一个认为的延迟: 不是立即发送消息, 而是延迟等待更多的消息一起批量发送. 类似TCP中的Nagle算法. 当获得了
batch.size
的同一partition的消息会立即发送, 不管linger.ms
的设置. 假如要发送的消息比较少, 会等待指定的时间以获取更多的消息. 默认设置为0 ms(没有延迟). - max.block.ms
控制
KafkaProducer.send()
和KafkaProducer.partitionsFor()
的阻塞时间. 这些方法会因为buffer满了或者metadata不可用而阻塞. 用户设置在serializers或者partitioner中的阻塞不会计算在内. - max.request.size 请求的最大大小(以字节为单位)。 此设置将限制生产者在单个请求中发送的记录批次数,以避免发送巨大的请求。 这也是最大记录批量大小的上限。 请注意,服务器拥有自己的记录批量大小,可能与此不同。
- partitioner.class
Partitioner
接口的实现类, 默认是org.apache.kafka.clients.producer.internals.DefaultPartitioner
. 需要处理数据倾斜等原因调整分区逻辑的时候使用. - request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试引起的消息重复的可能性。
优先级低
- enable.idempotence
设置为’true’, 将开启
exactly-once
模式. 设置为’false’(默认值), producer会因为borker失败等原因重试发送, 可能会导致消息重复. 设置为’true’时需要结合max.in.flight.requests.per.connection
设为’1’和retires
不能为’0’, 同时acks
需要设置为’all’或者’’-1’. - interceptor.classes
一组
ProducerInterceptor
接口的实现类, 默认为null. 可以通过该接口的实现类去拦截(可能需要修改)producer要发送的消息在发送到服务端之前. - max.in.flight.requests.per.connection
没有被确认unacknowledge的batch数, 如果设置大于1在
retries
设置了的情况下会出现消息发送顺序错误. - retry.backoff.ms 失败请求重试的间隔时间. 默认是100毫秒
- transaction.timeout.ms
事务协调器等待producer更新事务状态的最大毫秒数, 超过的话事务协调器会终止进行中的事务. 如果设置的时间大于broker的
max.transaction.timeout.ms
会收到InvalidTransactionTimeout
错误. - transactional.id 用于事务传递的TransactionalId。 这使得可以跨越多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。
6.3 Consumer端配置
- consumer.poll(1000)
新版本的Consumer的Poll方法使用了类似于Select I/O机制,因此所有相关事件(包括reblance,消息获取等)都发生在一个事件循环之中。
1000是一个超时时间,一旦拿到足够多的数据(参数设置),consumer.poll(1000)会立即返回
ConsumerRecords records
。 如果没有拿到足够多的数据,会阻塞1000ms,但不会超过1000ms就会返回。 - session. timeout. ms <= coordinator检测失败的时间 默认值是10s 该参数是 Consumer Group 主动检测 (组内成员comsummer)崩溃的时间间隔。若设置10min,那么Consumer Group的管理者(group coordinator)可能需要10分钟才能感受到。太漫长了是吧。
- max. poll. interval. ms <= 处理逻辑最大时间
这个参数是0.10.1.0版本后新增的,可能很多地方看不到喔。这个参数需要根据实际业务处理时间进行设置,一旦Consumer处理不过来,就会被踢出Consumer Group
。
注意:如果业务平均处理逻辑为1分钟,那么
max. poll. interval
. ms需要设置稍微大于1分钟即可,但是session. timeout. ms
可以设置小一点(如10s),用于快速检测Consumer崩溃。 - auto.offset.reset 该属性指定了消费者在读取一个没有偏移量后者偏移量无效(消费者长时间失效当前的偏移量已经过时并且被删除了)的分区的情况下,应该作何处理,默认值是latest,也就是从最新记录读取数据(消费者启动之后生成的记录),另一个值是earliest,意思是在偏移量无效的情况下,消费者从起始位置开始读取数据。
- enable.auto.commit 对于精确到一次的语义,最好手动提交位移
- fetch.max.bytes 单次获取数据的最大消息数。
- max.poll.records <= 吞吐量
单次poll调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。
一次从kafka中poll出来的数据条数,
max.poll.records
条数据需要在在session.timeout.ms
这个时间内处理完 默认值为500 - heartbeat. interval. ms <= 居然拖家带口 heartbeat心跳主要用于沟通交流,及时返回请求响应。这个时间间隔真是越快越好。因为一旦出现reblance,那么就会将新的分配方案或者通知重新加入group的命令放进心跳响应中。
- connection. max. idle. ms <= socket连接 kafka会定期的关闭空闲Socket连接。默认是9分钟。如果不在乎这些资源开销,推荐把这些参数值为-1,即不关闭这些空闲连接。
- request. timeout. ms
这个配置控制一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka要么重发这条消息,要么超过重试次数的情况下直接置为失败。
消息发送的最长等待时间.需大于
session.timeout.ms
这个时间 - fetch.min.bytes server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
- fetch.wait.max.ms
若是不满足
fetch.min.bytes
时,等待消费端请求的最长等待时间
Ref
- https://www.jianshu.com/p/d3e963ff8b70
- https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
- https://blog.csdn.net/dapeng1995/article/details/81536862
- http://orchome.com/5
- https://atbug.com/kafka-producer-config/