每个partition的数据如何保存到硬盘:
相等于一个巨型文件,被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file 消息数量不一定相等,这种特性方便old segment file 快速删除。默认保留7天数据。
Log.retention.hours=168 //segment文件保留的最长时间,超时将被删除。
Log.roll.hours=168//滚动生成新的segment文件最大时间
Broker如何保存数据:
在理论环境下,broker按照顺序读写的机制,可以每秒保存600M的数据。主要通过pagecache机制,尽可能的利用当前物理机器上的空闲内存来做缓存。
当前topic所属broker,必定有一个该topic的partition,partition是一个磁盘目录。Partition的目录中有多个segment组合(index,log, timeindex)
partition如何分布在不同的broker上:
int i=0
list{kafka01,kafka02,kafka03}
for(int i=0;i<5;i ){
brokerIndex=i%broker
hostName =list.get(brokerIndex)
}
Partition数量和broker数量关系:
生产数据分组策略:
默认defaultPartition Utils.abs(key.hashCode)% numPartitions
producer.send(new ProducerRecord(TOPIC,String.valueOf(messageNo),messageStr));
如何保证消费者消费的数据是有序的:
伪命题,全局保证不了。
1.生产者是集群模式,如果在生产的地方排序。全局序号管理器
2.broker端只设置一个partition->kafka的高并发负载均衡。
3.消费者如果一个组,如何保证消费者有序?消费来一个线程(自定义一个数据结构来做排序)
consumerGroup的组员和partition之间如何做负载均衡:
最好一一对应,一个partition对应一个conSumer。
如果consumer的数量过多,必然有空闲的consumer。
算法:
假如topic1,具有如下partitions: P0,P1,P2,P3
加入group中,有如下consumer: C1,C2
首先根据partition索引号对partitions排序: P0,P1,P2,P3
根据consumer.id排序: C0,C1
计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2
然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i 1) * M -1)]
Push vs Pull
在kafka的设计中,producer将消息push给broker,consumer从broker那里pull消息进行消费。基于push的模式,很难适应不同特点的consumer,push时,消息的发送速率完全由broker掌控。该设计的初衷是消费者以最大的速率进行消费,但是每个consumer的硬件性能、消费能力不同,一旦消费速度远远落后于生产速度,就会出现拒绝服务等异常。pull模式消费者可以依据其自身能力进行消费,每次消费完后他都会pull一批消息(可以设置size),没有不必要的等待时间。
pull模式的缺点是:当broker中没有未被消费的数据时,即offset已经是最新值了,consumer会一直循环,进入忙等待的状态。为了避免这种情况,允许consumer阻塞在“long poll”的等待中,直到数据到达,(也可以设置为一直等待until a given number of bytes)。consumer的配置文件中可以设置:fetch.min.bytes,表示consumer发起一次fetch请求,broker应该返回给他的最小字节数,如果broker端没有这么多消息,则请求被阻塞,一直等待,累积够这么多数据才返回。同时为了避免无止境的等待,可以设置:fetch.wait.max.ms,表示等待的最长时间。
消费者位置:
追踪记录已经被消费掉的数据非常重要,kafka中利用offset,且由consumer自己维护。
许多消息系统会在broker端保存元数据信息,记录哪些消息已被消费过。这种情况下存在一个问题:当一条消息发送给consumer之后,broker可以立即修改状态变为已消费,或者等到consumer的确认后才修改状态。如果broker发出消息后立即更新状态标记为consumed,则可能发生意外,使consumer未真正消费到这条消息,消息被丢失;为了克服这一点,许多消息系统增加了确认机制,即:发出消息后只是标记为send,等consumer真正消费完返回确认信号后才标记为consumed。这种方式确实可以避免丢失消息,但如果consumer已处理了该条消息,但是发送确认信号之前出故障了,那么确认丢失,消息便会被重复消费两次;另一个缺点就是增加确认机制必然导致性能降低,broker需要为每条消息维护多个状态,还需要处理异常情况。broker负载太重。
(kafka中broker无状态,consumer自己维护offset,同样可以在发出fetch请求后更新offset值,或者消费完这条消息之后才修改offset。你可以根据实际应用对可靠性的需求选择任意一种方式,立即修改值可能导致发生故障时,例如网络断开,消息得不到处理便丢失了。)
kafka的设计则避免了以上的复杂情况。consumer只消费一个partition,他只需要维护一个整形数值,表明下次消费的消息位置。而且,consumer会定期向zookeeper提交他的offset,避免自己crash之后继续消费。(可以详细看一下consumer的参数)
如何保证数据的完全生产:
消息传递的可靠性保证:(涉及producer到broker、consumer与broker,即生产者端和消费者端)
at most once:消息可能会丢失但绝不重传;
at least once:从不丢失,可能重传;
exactly once:最理想的状态,消息只被传送一次,不丢失也不重传,kafka目前不能保证;
对于producer:发送一条消息给broker,只有消息被commit to the log,才算发送成功。
由于broker 有备份机制,所以用户可以设置自己想要的可靠性:
request.required.acks:
0:不等待broker返回确认消息。
1:等待topic中某个partition leader保存成功状态反馈。
-1:等待topic中某个partition 所有副本都保存成功的状态反馈。
对于consumer:消费一条消息,之后更新offset,有以下几种方式:
读消息,更新offset,最后处理消费消息:可能未处理之前consumer crash,但已经更新了offset,consumer重启后或者一个新的consumer会从offset之后的位置继续消费,所以丢失了数据,at-most-once;
读消息,处理消费消息,最后更新offset:可能处理之后,更新offset之前crash,消息会被重复处理,at-least-once;
怎么实现exactly-once:如果可以把offset和消息消费后的output保存在一起,eg都保存在HDFS文件中,就可以保证他们被同时更新,这样就可以保证output时刻和offset保持同步。