System|分布式|Kafka

2021-11-22 10:38:05 浏览数 (1)

Kafka是最前沿的开源MQ之一,阿里的RocketMQ也借鉴了不少Kafka的思想。2011年领英发了篇文章描述Kafka的设计,我这先学习初版。

新版最重要的改变就是exactly once,众所周知,at least once很容易,retry即可; 而exactly once则很难, 它必须同时维护幂等性。

Reference: http://notes.stephenholiday.com/Kafka.pdf

架构

经典的生产者消费者模型。一个broker可能持有多个topic,每个topic又可能有多个partition。

还有一个东西没有画出来,就是zookeeper,管理metadata。

代码语言:javascript复制
//Producer 
producer = new Producer(…);
 message = new Message(“test message str”.getBytes());
 set = new MessageSet(message);
 producer.send(“topic1”, set);
//Consumer
streams[] = Consumer.createMessageStreams(“topic1”, 1)
 for (message : streams[0]) {
 bytes = message.payload();
 // do something with the bytes
 }

单机

简易存储

事实上Kafka原本是作为日志系统,供其offline处理的。

  • Producer

append log

  • Broker

每个partition对应连续的逻辑log,由固定大小的一组物理log segment组成。log按照pub数目或者时间周期性地进行flush持久化。

log直接按照逻辑offset映射,而不是通过显式id索引,这里的id并不是连续的,而是通过上一条的id增加msg长度计算的。

Broker在内存中维护偏移量的有序链表,包括每个segment第一个msg的offset

(这里不知道是不是把这些offset给单独存储了,一方面充当跳表容易定位,一方面容易计算出在segment中的物理offset)

  • Consumer
  1. Consumer会发出pull(begin offset, byte length),要求broker把信息加载到buffer中准备consume。
  2. Broker根据offset选择对应的segment并发送数据,
  3. Consumer计算出新的offset用于下次pull,顺序地消费信息,如果发出ack,表示自己已经收到了之前的所有消息。

效率传输

  1. Batching
  2. No Buffer - 利用fs的cache,避免双重cache,减少内存开销不必GC
  3. Caching - consumer通常都会在producer后一段时间访问,因此利用cache
  4. sendfile - 直接从file -> socket,只需要disk->kernel->socket两次拷贝,一次syscall,免去了常规disk->kernel->user->kernel->socket的开销

无状态

Kafka本身并不记忆什么数据被消费,而是只删除过期(自定义)的log。如果是push model,那么显然需要kafka记忆;而pull model则只需要consumer记忆。Consumer因此能随时消费之前的消息,例如假如consumer需要持久化消息,他可以做checkpoint,然后恢复的时候redo log。

分布式

Producer可以随机或者按照partition函数映射到对应的broker。

一组Consumer称为Consumer Group,合起来消费某个topic,每个msg只被一个Consumer消费;而Group之间则独立。这里其实就相当于负载均衡,因此要避免重复消费。

(后面也可以broadcast)

partition是并行基本单元,每次仅有一个group内的consumer能消费。

这里利用zookeeper实现

  • 监测Consumer和Broker变化(服务发现)
  • 变化时Consumer重新负载均衡,算法如下
  • 维护消费关系与partition offset
代码语言:javascript复制
For each topic T that Ci subscribes to {
 remove partitions owned by Ci from the ownership registry
 read the broker and the consumer registries from Zookeeper
 compute PT = partitions available in all brokers under topic T
 compute CT = all consumers in G that subscribe to topic T
 sort PT and CT
 let j be the index position of Ci in CT and let N = |PT|/|CT|
 assign partitions from j*N to (j 1)*N - 1 in PT to consumer Ci
 for each assigned partition p {
 set the owner of p to Ci in the ownership registry
 let Op = the offset of partition p stored in the offset registry
 invoke a thread to pull data in partition p from offset Op
 }
}

但这样因为负载均衡都是本地进行的,consumer彼此不通信。有的consumer会尝试pull那些仍然属于其他consumer的partition,这种情况它会释放自己的消费的partition然后等待一会儿之后retry rebalance

新增的consumer group的offset可能是log offset的最小或者最大值,根据配置而定。

传输

初版的Kafka仅仅保证At least once,因为领英暂时不需要exactly once。现在的exactly once是在producer增加了id用于去重,同时提供了分布式事务支持

同时Kafka仅仅能保证单个partition有序(append log),而无法保证topic有序

Kafka在log加入CRC(循环冗余校验)避免log污染

初版的Kafka没有备份机制,现在的kafka是主从备份,平时只有leader服务。

生产

Kafka本身可以作为其他Kafka的producer和consumer

因为Kafka只支持无类型字节流,使用Avro作为序列化协议,在里面存储了schema ID提供类型信息,然后再反序列化。

0 人点赞