编写一个配置化的Kafka Proxy,让你分钟级别接入Kafka

2022-08-10 19:39:35 浏览数 (1)

消息队列作为服务解耦、异步收发消息的组件,如今已经广泛应用于各大互联网公司,具备一定规模的公司会有专门的团队负责维护消息队列。当我们需要使用消息队列时,得写专门的代码进行连接,比如若用Golang编码,当需要使用Kafka消息队列时可能会使用 sarama库。这意味着需要在业务代码中写一些与业务无关的kafka配置与连接代码。

比如某个需求需要连接kafka的某个topic消费msg计算一些信息,此时的开发流程是: 分析需求->使用sarama连接topic->编写业务代码处理msg。笔者所在的部门是一个中台部门,经常需要接入各种topic去计算实时信息。如果来一个需求就写一坨连接代码去处理消息,成本会很高,且这么搞并不优雅,有些时候需求排期比较紧,你可能来不及写启动/停止消费的API,此时如果出现了线上问题,那kafka非常方便的流量回放都没法使用,老板问你为什么,你就只能干瞪眼了~

分析一下上面的开发流程,在接收消息进行处理这个流程中有简化的方案:我们可以设计一个proxy模块,它负责连接kafka topic拉取消息并把消息发送到业务模块的对应接口上。这些功能可以搞成配置化的流程,用户只需要在proxy的管理系统上填写对应工单,就可以收到对应topic的消息。有了这玩意,需求开发流程变为:分析需求->填个工单接收消息->编写业务代码处理msg。这样大大简化了消息队列接入流程,提高了开发效率。如果需求比较简单,比如只是连接单个topic,读取某个字段进行计数,那编写业务代码处理msg这块也可以做成配置化,整体开发起来非常轻松愉快。

那么proxy模块可以具有什么样的功能呢:

  1. proxy负责连接kafka的多个topic,并把topic的msg转发到业务填写的收集消息 API;
  2. proxy可以控制topic消息发送的启动/停止,业务模块在消息重启期间可以重置offset;
  3. proxy可以做msg的ETL功能:比如根据topic中的type值决定是否向业务API发送此条消息;再比如一个topic的schema中可能有上百个字段,好几层JSON信息,但是本次业务需求只需要其中三个字段即可满足需求,那么用户填写工单时可以声明所需字段,proxy的ETL负责打平消息,这样可以降低网络传输的开销成本。

但是这样的ETL功能会破坏topic原始schema,对定位只是proxy的模块功能显得有点重,系统可以提供这样的功能,并在管理平台工单页面周知用户,用户用不用那是另一码事了。

如果要写这么一个Proxy服务,它具有什么样的特点呢,先来看一下我设计的proxy实例工作的流程图:

  1. 为了实现配置化,服务需要具备动态读取配置和新建消费实例的能力,那么对应的kafka配置和HTTP服务配置可以放在像MySQL这样的关系型数据库中,新建实例即插入一条新数据可以new一个新的消费实例;
  2. proxy的服务器上保存着各个topic的消费实例,意味着这是个有状态的服务,一般的业务系统都是无状态的,接口里面的信息保存在关系型或者kv型存储中;写这种有状态的服务需要非常注意并发问题服务状态与db状态需保持一致;
  3. 现在具有一定规模的互联网公司一般会用微服务,各个服务服务按照事业部、部门等维度做成了一颗巨大的服务树,一个服务最终是服务树的一个叶子节点,服务节点有多个集群处理不同流量,一个集群有多台机器。服务集群是由专门的SRE负责运维的。kafka的topic一般会有多个分区(partition),消费时会有一定的限制条件:一台机器可以消费多个分区,但是一个分区最多只能让一台机器拉取消息,如果一个topic由10个分区,你有20台机器,那么意味着有10台机器拉取不到流量;如果你有5台机器,那么每台机器可能会收到两个分区的流量:

也就是consume过程中发生了rebalance。为了性能,一台机器最好可以消费多个topic的一个partition,这意味着服务在服务树的基础上,proxy还需要具有小集群管理的能力。有了这个能力,我们可以控制topic的分区在不同机器上的启停,比如某台机器上消费的Topic 有A、B两个,突然topic的流量激增,机器的CPU idle不足,那我们可以把这台机器上的B topic的分区流量转移给其他资源充足的机器。

  1. 作为一个偏中部的服务,proxy的稳定性非常重要。同时proxy作为consumer在拉取topic的消息时需要有ACK机制 预防业务系统收到消息却由于程序崩溃导致消息丢失的情况。proxy往业务系统发送消息不是无限次的,需要考虑当msg多次发往业务系统仍失败的情况需如何处理,最后proxy对于接入的topic应具备监控报警机制让用户可以观察实际的消费情况。

有了这些需求,可以进行系统设计了,这是我设计的proxy系统架构图:

下面来介绍一下架构图中的细节内容:

配置化的需求

用户接入Proxy系统的流程是:填写kafka topic的配置,让proxy可以获取kafka相关元数据 -> 若有ETL的需求,填写ETL的元数据 -> 填写业务收集接口的信息(这里为了省事,业务接口只支持程序员最喜欢的HTTP协议)

kafka topic元信息包括broker server信息, topic name, app info等;

对于ETL功能,每个topic的schema应为固定格式,我们只需从topic 的schema中提取我们需要的数据即可,这里可以用gjson库实现;

为了短平快,服务选择我们普通开发最喜欢的HTTP协议,那么本质上就是编写一个可配置化的HTTP client,HTTP接口信息包括服务VIP信息(Nginx可以根据VIP配置把流量轮询发送给下游服务、URL PATH、服务入参、出参、超时时间,重试次数等信息;这个需求比较简单,这里不再赘述。

小集群管理能力

为了充分压榨服务器的性能,每台服务器上需要消费多个topic的partition, 为了方便管理,服务器对于不同的topic应具有启动消费和停止消费的能力。我选择在db的kafka配置表中对于每种topic保存多个机器的hostname来描述消费关系,比如topic A有三个partition,那么机器字段为["rock_hna_01.py","rock_hna_02.py","rock_hna_03.py"], db里还有一个consume_statue字段用于控制consume行为的启动/停止。同时,服务提供实时刷新/定时刷新config的接口,保证当db配置刷新后,服务中的配置随之进行刷新。

有状态的服务

写这个服务需要注意并发情况和服务配置与db配置保持一致,除了上面提过的定时/实时刷新接口,代码内部会有挺多的全局共享变量,为了保证并发安全,我用了大量的golang并发原语,比如:sync.Mutex、sync.Map、sync.Once、context等。

这里说一下context的使用场景,由于consume代码是这样的结构:

代码语言:javascript复制
for {
select {
 ...
}
}

而停止消费topic的接口需要控制协程关闭,否则会发生泄露,所以选择使用context(channel也可以)。

稳定性相关能力

在服务稳定性建设上,我选择在服务中增加限流功能,限流功能使用官方的令牌桶实现。限流能力的重要性不言而喻,有些具备Exactly Once的下游在出现问题后会选择回溯流量,倘若他一下回放了30天的流量,没有限流的话,瞬间的流量洪峰会压垮你娇贵的proxy服务,啧啧啧,想想就可怕,没有金刚钻,还是最好别揽这种可怕的服务;

consume可靠性

为了预防consume端丢失消息,这里使用ack机制,具体是:推送消息给下游系统,并监控errcode, 当response中的errcode返回0时认为消费成功,否则进行三次重试,如果三次重试仍然失败,把消息落离线日志,并进行监控和报警。

还有一些细节部分没有介绍到,如果读者对这个服务感兴趣,欢迎留言和我讨论。

0 人点赞