kafka系列--结构02

2023-06-29 15:21:46 浏览数 (1)

每个partition的数据如何保存到硬盘:

相等于一个巨型文件,被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file 消息数量不一定相等,这种特性方便old segment file 快速删除。默认保留7天数据。

Log.retention.hours=168 //segment文件保留的最长时间,超时将被删除。

Log.roll.hours=168//滚动生成新的segment文件最大时间

Broker如何保存数据:

在理论环境下,broker按照顺序读写的机制,可以每秒保存600M的数据。主要通过pagecache机制,尽可能的利用当前物理机器上的空闲内存来做缓存。

当前topic所属broker,必定有一个该topic的partition,partition是一个磁盘目录。Partition的目录中有多个segment组合(index,log, timeindex)

partition如何分布在不同的broker上:

int i=0

list{kafka01,kafka02,kafka03}

for(int i=0;i<5;i ){

brokerIndex=i%broker

hostName =list.get(brokerIndex)

}

Partition数量和broker数量关系:

生产数据分组策略:

默认defaultPartition Utils.abs(key.hashCode)% numPartitions

producer.send(new ProducerRecord(TOPIC,String.valueOf(messageNo),messageStr));

如何保证消费者消费的数据是有序的:

伪命题,全局保证不了。

1.生产者是集群模式,如果在生产的地方排序。全局序号管理器

2.broker端只设置一个partition->kafka的高并发负载均衡。

3.消费者如果一个组,如何保证消费者有序?消费来一个线程(自定义一个数据结构来做排序)

consumerGroup的组员和partition之间如何做负载均衡:

最好一一对应,一个partition对应一个conSumer。

如果consumer的数量过多,必然有空闲的consumer。

算法:

               假如topic1,具有如下partitions: P0,P1,P2,P3

加入group中,有如下consumer: C1,C2

首先根据partition索引号对partitions排序: P0,P1,P2,P3

根据consumer.id排序: C0,C1

计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2 

然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i 1) * M -1)]

Push vs Pull

      在kafka的设计中,producer将消息push给broker,consumer从broker那里pull消息进行消费。基于push的模式,很难适应不同特点的consumer,push时,消息的发送速率完全由broker掌控。该设计的初衷是消费者以最大的速率进行消费,但是每个consumer的硬件性能、消费能力不同,一旦消费速度远远落后于生产速度,就会出现拒绝服务等异常。pull模式消费者可以依据其自身能力进行消费,每次消费完后他都会pull一批消息(可以设置size),没有不必要的等待时间。

      pull模式的缺点是:当broker中没有未被消费的数据时,即offset已经是最新值了,consumer会一直循环,进入忙等待的状态。为了避免这种情况,允许consumer阻塞在“long poll”的等待中,直到数据到达,(也可以设置为一直等待until a given number of bytes)。consumer的配置文件中可以设置:fetch.min.bytes,表示consumer发起一次fetch请求,broker应该返回给他的最小字节数,如果broker端没有这么多消息,则请求被阻塞,一直等待,累积够这么多数据才返回。同时为了避免无止境的等待,可以设置:fetch.wait.max.ms,表示等待的最长时间。

消费者位置: 

追踪记录已经被消费掉的数据非常重要,kafka中利用offset,且由consumer自己维护。

      许多消息系统会在broker端保存元数据信息,记录哪些消息已被消费过。这种情况下存在一个问题:当一条消息发送给consumer之后,broker可以立即修改状态变为已消费,或者等到consumer的确认后才修改状态。如果broker发出消息后立即更新状态标记为consumed,则可能发生意外,使consumer未真正消费到这条消息,消息被丢失;为了克服这一点,许多消息系统增加了确认机制,即:发出消息后只是标记为send,等consumer真正消费完返回确认信号后才标记为consumed。这种方式确实可以避免丢失消息,但如果consumer已处理了该条消息,但是发送确认信号之前出故障了,那么确认丢失,消息便会被重复消费两次;另一个缺点就是增加确认机制必然导致性能降低,broker需要为每条消息维护多个状态,还需要处理异常情况。broker负载太重。

     (kafka中broker无状态,consumer自己维护offset,同样可以在发出fetch请求后更新offset值,或者消费完这条消息之后才修改offset。你可以根据实际应用对可靠性的需求选择任意一种方式,立即修改值可能导致发生故障时,例如网络断开,消息得不到处理便丢失了。)

      kafka的设计则避免了以上的复杂情况。consumer只消费一个partition,他只需要维护一个整形数值,表明下次消费的消息位置。而且,consumer会定期向zookeeper提交他的offset,避免自己crash之后继续消费。(可以详细看一下consumer的参数)

如何保证数据的完全生产:

      消息传递的可靠性保证:(涉及producer到broker、consumer与broker,即生产者端和消费者端)

      at most once:消息可能会丢失但绝不重传;

      at least once:从不丢失,可能重传;

      exactly once:最理想的状态,消息只被传送一次,不丢失也不重传,kafka目前不能保证;

      对于producer:发送一条消息给broker,只有消息被commit to the log,才算发送成功。

由于broker 有备份机制,所以用户可以设置自己想要的可靠性:

     request.required.acks:

     0:不等待broker返回确认消息。

     1:等待topic中某个partition leader保存成功状态反馈。

     -1:等待topic中某个partition 所有副本都保存成功的状态反馈。

 对于consumer:消费一条消息,之后更新offset,有以下几种方式:

读消息,更新offset,最后处理消费消息:可能未处理之前consumer crash,但已经更新了offset,consumer重启后或者一个新的consumer会从offset之后的位置继续消费,所以丢失了数据,at-most-once;

读消息,处理消费消息,最后更新offset:可能处理之后,更新offset之前crash,消息会被重复处理,at-least-once;

怎么实现exactly-once:如果可以把offset和消息消费后的output保存在一起,eg都保存在HDFS文件中,就可以保证他们被同时更新,这样就可以保证output时刻和offset保持同步。

0 人点赞