Kafka详细教程
完整教程请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
kafka概述
Kafka 最初是由Linkedin 即领英公司基于Scala和 Java语言开发的分布式消息发布-订阅系统,现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个 消息队列(mq)系统存在,而事实上kafka已然成为一个流行的分布式流处理平台。其具有高吞吐、低延迟的特性,许多大数据处理系统比如storm、spark、flink等都能很好地与之集成。按照Wikipedia上的说法,kafka的核心数据结构本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。总的来讲,kafka通常具有3重角色:
- 消息系统 Kafka和传统的消息队列比如RabbitMQ、RocketMQ、ActiveMQ类似,支持流量削锋、服务解耦、异步通信等核心功能。
- 流处理平台 Kafka 不仅能够与大多数流式计算框架完美整合,并且自身也提供了一个完整的流式处理库,即kafka Streaming。kafka Streaming提供了类似Flink中的窗口、聚合、变换、连接等功能。
- 存储系统 通常消息队列会把消息持久化到磁盘,防止消息丢失,保证消息可靠性。Kafka的消息持久化机制和多副本机制使其能够作为通用数据存储系统来使用。
一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),在业界主要应用于大数据实时处理领域。
Kafka体系结构
如图所示,kafka的体系结构中通常包含多个Producer(生产者)、多个Consumer(消费者)、多个Broker(Kafka服务器)以及一个zookeeper集群。
体系结构中几个角色
- Producer 消息发送方,即生产者,负责生产消息,并将其发送到kafka服务器(broker)中。
- Consumer 消息接收方,即消费者,负责消费消息。消费者客户端主动从kafka服务器上拉取(pull)到消息,应用程序进行业务处理。
- Broker kafka服务实例,即kafka服务器,让生产者客户端、消费者客户端来连接,可以看做消息的中转站。多个Broker 将组成一个Kafka 集群。
- zookeeper ZooKeeper 是在Kafka集群中负责管理集群元数据、控制器选举等操作的分布式协调器。
分区和主题
Topic (主题)和Partition(分区)是Kafka 中的两个核心概念。在Kafka 中,消息以topic为单位进行归类。生产者必须将消息发送到指定的topic,即发送到Kafka 集群的每一条消息都必须指定一个主题;消费者消费消息也要指定主题,即消费者负责订阅主题并进行消费。
试想如果一个Topic在Kafka中只对应一个存储文件,那么海量数据场景下这个文件所在机器的I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在Kafka中一个topic可以分为多个分区(partition),每个分区通常以分布式的方式存储在不同的机器上。一个特定的partition只属于一个topic。kafka通常用来处理超大规模数据,因此创建主题时可以立即指定多个分区以提高处理性能。当然也可以创建完成后再修改分区数。同一个主题的不同分区包含的消息是不同的。底层存储上,每一个分区对应一个可追加写的Log文件,消息在被追加到分区Log文件时会分配一个特定的offset (偏移量),offset 是消息在分区中的唯一标识, Kafka 通过offset 来保证消息在分区内的有序性。offset 并不跨越分区,即Kafka 保证的是分区有序而不是全局有序。 下图展示了消息的追加写入:
Kafka中的分区可以分布在不同的服务器( broker )上,因此主题可以通过分区的方式跨越多个broker ,相比单个broker 、单个分区而言并行度增加了,性能提升不少。
在分区之下,Kafka又引入了副本(Replica)的概念。如果说增加分区数实现了水平扩展,增加副本数则是实现了纵向扩展,并提升了容灾能力。同一分区的不同副本中保存的消息是相同的。需要注意的是,在同一时刻,副本之间并非完全一样,因为同步存在延迟。副本之间是一主多从的关系。leader 副本负责处理读写请求, follower 副本只负责与leader 副本进行消息同步。副本存在不同的broker中,当leader 副本出现故障时,从follower 副本中重新选举新的leader 副本对外提供读写服务。Kafka 通过多副本机制实现了故障的自动转移,当Kafka 集群中某个broker 挂掉时,副本机制保证该节点上的partition数据不丢失,仍然能保证kafka服务可用。
下图展示了一个多副本的架构。本例中kafka集群中有4台broker,主题分区数为3,且副本因子也为3。生产者和消费者客户端只和leader副本进行交互,follower副本只负责和leader进行消息同步。每个分区都存在不同的broker中,如果每个broker单独部署一台机器的话,那么不同的Partition及其副本在物理上便是隔离的。
可以认为topic 是逻辑上的概念,partition是物理上的概念,因为每个partition 都对应于一个.log文件存储在kafka 的log目录下,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset(偏移位)。消费者组中的每个消费者,每次消费完数据都会向kafka服务器提交offset,以便出错恢复时从上次的位置继续消费。
消费组(Consumer Group)是Kafka的消费理念中一种特有的概念,每个消费者都属于一个消费组。生产者的消息发布到主题后,只会被投递给订阅该主题的每个消费组中的一个消费者。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;所有的消费者都有一个与之对应的消费者组,即消费者组是逻辑上的一个订阅者。消费者组之间互不影响,多个不同的消费者组可以同时订阅一个Topic,此时消息会同时被每个消费者组中一个消费者消费。
理解上述概念有助于在实际应用中规划topic分区数,消费者数、生产者数。实际生产中,一般分区数和消费者数保持相等,如果这个主题的消费者数大于主题的分区数,那么多出来的消费者将消费不到数据,只能浪费系统资源。
kafka文件存储机制
如前文所述,生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个partition 分为多个segment。每个segment都有对应的.index文件和.log文件以及.timeindex文件。这些文件位于kafka的配置文件server.properties中配置项log.dirs所指定目录下的一个文件夹中,该文件夹的命名规则为:topic名称 分区序号。例如test这个topic 设置了三个分区,则会创建对应的文件夹test-0,test-1,test-2。每个文件下的索引和日志格式文件如下,index和log文件以当前segment 的第一条消息的offset 命名。
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex 00000000000000130610.index 00000000000000130610.log 00000000000000121343.timeindex
日志分段文件对应的两个索引文件主要是用来提高查找消息的速度。偏移量索引建立了消息位移(offset)和物理地址之间的映射关系;时间戳索引则方便根据指定时间戳查找偏移量信息。每写入一定量(kafka配置文件参数log.index.interval.bytes 指定,默认值为4096 ,即4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。可以通过配置log.index.interval.bytes 的值,增加或减小索引项的密度。
kafka高效读写的原因之一在于使用了顺序写磁盘技术和零拷贝技术。
1)顺序写盘
传统的消息中间件比如RabbitMQ使用内存作为默认的存储介质,而将磁盘作为备选介质,以此实现高吞吐、低延迟的特性。事实上有研究表明,同样的磁盘,顺序写速度能到600MB/s,而随机写只有100K/s。这与磁盘的机械特性有关,顺序写省去了大量磁头寻址时间。因此顺序写磁盘的速度甚至快于随机写内存。因此Kafka 在设计时采用了文件追加的方式来顺序写入消息到磁盘中。此外kafka还充分利用了磁盘页缓存来减少磁盘IO。
2)零拷贝
零拷贝是指将数据直接从磁盘复制到网卡设备中而无需经过应用程序。零拷贝大大提高了应用程序的性能,减少了系统在内核模式和用户模式之间的上下文切换。在netty等框架中也使用了Zero-Copy技术来提升IO性能。在Linux中,零拷贝依赖操作系统底层的sendfile()函数,其实JDK中的FileChannel.transferTo()方法底层实现即依赖sendfile()函数。
举个栗子,比如服务端要将本地文件传递给客户端,两种不同的技术流程分别如下:
传统非零拷贝技术
首先要调用read()系统函数将磁盘中的文件复制到内核态的Read Buffer中,在CPU的控制下,再将内核态数据复制到用户态下。然后调用系统函数write()将用户模式下的数据复制到内核模式下的Socket Buffer中。最后将内核态的Socket Buffer中的数据复制到硬件网卡设备中传输。上述过程中,数据白白地从内核态到用户态“浪”了一圈,即2次复制操作,4次内核态、用户态上下文切换。再来看看零拷贝是如何处理的。
零拷贝技术
零拷贝技术使用操作系统支持的DMA ( Direct Memory Access )技术将文件内容复制到内核态下的ReadBuffer 中。数据没有被复制到Socket Buffer。只有包含数据的位置和长度的信息的文件描述符传送到Socket Buffer中。数据直接从内核态传输到网卡外设中,操作系统的内核、用户态上下文切换只有2次,数据复制也减少了。
kafka安装与配置
单机
约定:本文所有的软件都安装在/usr/local/myapp/下,所有对linux的操作建议使用root用户直接干,避免不必要的麻烦!
jdk安装
官网下载linux系统jdk压缩包,本文使用jdk-8u261-linux-x64.tar.gz,下载到本地后上传到linux服务器。
- 查看并卸载系统自带jdk [root@vm1 ~]# java -version openjdk version “1.8.0_242” OpenJDK Runtime Environment (build 1.8.0_242-b08) OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode) 发现是自带的openjdk,查询并卸载 [root@vm1 ~]# rpm -qa |grep jdk java-1.8.0-openjdk-headless-1.8.0.242.b08-1.el7.x86_64 java-1.7.0-openjdk-1.7.0.251-2.6.21.1.el7.x86_64 java-1.8.0-openjdk-1.8.0.242.b08-1.el7.x86_64 java-1.7.0-openjdk-headless-1.7.0.251-2.6.21.1.el7.x86_64 copy-jdk-configs-3.3-10.el7_5.noarch 使用rpm -e –nodeps命令依次删除 rpm -e –nodeps java-1.8.0-openjdk-headless-1.8.0.242.b08-1.el7.x86_64 rpm -e –nodeps java-1.7.0-openjdk-1.7.0.251-2.6.21.1.el7.x86_64 rpm -e –nodeps java-1.8.0-openjdk-1.8.0.242.b08-1.el7.x86_64 rpm -e –nodeps java-1.7.0-openjdk-headless-1.7.0.251-2.6.21.1.el7.x86_64
- 安装下载的jdk 解压下载的jdk压缩包到本地建好的目录 [root@vm1 ~]# tar -zxvf jdk-8u261-linux-x64.tar.gz -C /usr/local/myapp/jdk/
- 配置环境变量 /etc/profile文件中添加下列配置 [root@vm1 ~]# vim /etc/profile export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_261/ export CLASSPATH= : C L A S S P A T H : :CLASSPATH: :CLASSPATH:JAVA_HOME/lib/ export PATH= P A T H : PATH: PATH:JAVA_HOME/bin 表示家目录,也可以不使用,直接写全路径,比如:/home/zpc/soft/jdk/jdk1.8.0_261/ 使用source 命令让配置立即生效 [root@vm1 ~]# source /etc/profile
- 再次查看jdk版本 [root@vm1 ~]# java -version java version “1.8.0_261” Java™ SE Runtime Environment (build 1.8.0_261-b12) Java HotSpot™ 64-Bit Server VM (build 25.261-b12, mixed mode) 安装成功!
zooKeeper安装
- 官网下载zk安装包 本文使用apache-zookeeper-3.6.1-bin.tar.gz
- 解压到本地指定目录 [root@vm1 ~]# tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local/myapp/zookeeper 重命名: [root@vm1 ~]# mv /usr/local/myapp/zookeeper/apache-zookeeper-3.6.1-bin/ /usr/local/myapp/zookeeper/zookeeper-3.6.1
- 环境变量配置 [root@vm1 ~]# vim /etc/profile export ZOOKEEPER_HOME=/usr/local/myapp/zookeeper/zookeeper-3.6.1 export PATH= P A T H : PATH: PATH:ZOOKEEPER_HOME/bin 执行 source /etc/profile使配置生效
- 修改zk配置文件
进入zk配置目录zookeeper-3.6.1/conf复制配置文件模板并修改
[root@vm1 conf]# cp zoo_sample.cfg zoo.cfg
修改
zoo.cfg
文件如下: #数据目录 dataDir=/usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir #单独指定事务日志目录 dataDir=/usr/local/myapp/zookeeper/zookeeper-3.6.1/dataLogDir #zk对外服务端口 clientPort=2181 #基本时间单位,和时间相关的配置都是该值的倍数;即zk服务器单次心跳间隔时间,单位毫秒 tickTime=2000 #投票选举leader时zk服务器连上leader的时间限制,initLimit*tickTime为总的超时时间 initLimit=10 #zk正常工作时leader和follower之间最多心跳数限制,syncLimit*tickTime为总超时容忍时间,超过此时间,follower将从zk集群中被剔除 syncLimit=5 注意:上述配置中的数据和日志文件路径先要在系统中创建 [root@vm1 conf]# mkdir -p /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir [root@vm1 conf]# mkdir -p /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataLogDir 其它配置文件参数含义参见官网说明 - 启动zookeeper 由于配置了环境变量,所以可以在任意目录执行zkServer.sh start [root@vm1 ~]# zkServer.sh start ZooKeeper JMX enabled by default Using config: /home/zpc/soft/apache-zookeeper-3.6.1/bin/…/conf/zoo.cfg Starting zookeeper … STARTED 上述输出表明启动成功。如果初次安装启动失败,比如报错:Starting zookeeper … FAILED TO START 很可能是安装包下载不对,有人下载的是apache-zookeeper-3.6.1.tar.gz,应该是apache-zookeeper-3.6.1-bin.tar.gz才对。单机模式启动需要zk的jar包。zk解压后的/lib/目录下要有zookeeper-3.6.1.jar才行。
- 查看zk进程和zk状态 [root@vm1 ~]# jps 6754 QuorumPeerMain 7013 Jps [root@vm1 ~]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/myapp/zookeeper/zookeeper-3.6.1/bin/…/conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone 关闭zookeeper [root@vm1 ~]# zkServer.sh stop 至此,zookeeper standalone模式安装成功!
kafka安装
- 官网下载安装包 本文使用的是kafka_2.12-2.6.0.tgz
- 解压到本地指定目录 [root@vm1 ~]# tar -zxvf kafka_2.12-2.6.0.tgz -C /usr/local/myapp/kafka
- 环境变量配置 配置环境变量的目的之一是在任何目录下都可以执行kafka bin目录下的命令 export KAFKA_HOME=/usr/local/myapp/kafka/kafka_2.12-2.6.0 export PATH= P A T H : PATH: PATH:KAFKA_HOME/bin
- 修改kafka配置文件 配置文件在config/server.properties中,主要修改如下参数 #broker的全局唯一id,一般从0开始编号,不能重复 broker.id=0 #kafka对外提供服务监听地址,设置运行kafka的机器IP地址,客户端用此地址连接到kafka listeners=PLAINTEXT://192.168.174.129:9092 #日志目录,多个目录可以用逗号隔开(先在系统中创建好目录) log.dirs=/usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka #默认分区数配置,一般在集群配置中要设多个分区提高性能 num.partitions=1 #创建topic时的默认副本数,一般在集群配置中要设多个副本,提高可用性 default.replication.factor=1 #zk服务器地址配置,一般单机模式zk和kafka运行在同一台机器,配置kafka所在机器ip即可 zookeeper.connect=localhost:2181 #启用删除topic的功能,默认为true delete.topic.enable=true #用来处理磁盘I/O的线程数,默认为8 num.io.threads=8 #用来处理网络请求的线程数,默认为3 num.network.threads=3 #kafka log日志保留的时间,默认7天(168h) log.retention.hours=168
- 启动kafka 启动kafka前必须先运行zookeeper,因为kafka会连接到zookeeper [root@vm1 ~]# zkServer.sh start [root@vm1 ~]# jps 52358 Jps 52135 QuorumPeerMain 看到QuorumPeerMain进程即表示zk启动成功 下面启动kafka,启动kafka时要指定配置文件,&表示后台执行。首次运行前先清空kafka日志文件,防止脏数据 [root@vm1 ~]# rm -rf /usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka/* [root@vm1 ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &
- 查看kafka进程是否启动成功 [root@vm1 ~]# jps 52135 QuorumPeerMain 54827 Jps 54399 Kafka 注意,关闭kafka时使用命令关闭,不要随便强制关闭,先关闭kafka,再关闭zookeeper [root@vm1 ~]# kafka-server-stop.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties [root@vm1 ~]# zkServer.sh stop jps查看进程发现zk和kafka进程均已停止
- kafka启动和停止脚本 把kafka和zookeeper的命令封装成shell脚本 [root@vm1 ~]# vim kafka_start.sh #!/bin/bash #启动zookeeper zkServer.sh start & sleep 10 #等10秒后执行 #启动kafka kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & [root@vm1 ~]# vim kafka_stop.sh #!/bin/bash #停止kafka kafka-server-stop.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & sleep 10 #等10秒执行 #停止zookeeper zkServer.sh stop 由于配置了环境变量,上述脚本中的命令可以不写绝对路径(全路径)。 为脚本增加执行权限: chmod x kafka_start.sh chmod x kafka_stop.sh 这样就可以一键启动、停止kafka了: [root@vm1 ~]# ./kafka_start.sh [root@vm1 ~]# ./kafka_stop.sh
集群环境
机器规划:3台服务器,1台leader,2台follower,每台机器都在/etc/hosts中配置好域名ip映射
机器主机名 | 机器IP | 机器角色 |
---|---|---|
vm1 | 192.168.174.129 | leader(master) |
vm2 | 192.168.174.131 | follower(slave) |
vm3 | 192.168.174.130 | follower(slave) |
下列所有的安装都可以先在第一台机器进行,然后再拷贝到集群其它机器中修改。
jdk安装
在vm1上安装完jdk后拷贝到vm2、vm3中,vm1上的安装参照单机jdk安装。
[root@vm1 ~]# scp -r /usr/local/myapp/jdk/jdk1.8.0_261/ root@vm2:/usr/local/myapp/jdk/jdk1.8.0_261/ [root@vm1 ~]# scp -r /usr/local/myapp/jdk/jdk1.8.0_261/ root@vm3:/usr/local/myapp/jdk/jdk1.8.0_261/ [root@vm1 ~]# scp /etc/profile root@vm2:/etc/profile [root@vm1 ~]# scp /etc/profile root@vm3:/etc/profile [root@vm2 ~]# source /etc/profile [root@vm3 ~]# source /etc/profile
在每台机器 上执行java -version 查看jdk版本,确保jdk已正确安装
zookeeper集群安装
Zookeeper集群原则上需要2n 1个实例才能保证集群有效性,所以集群规模至少是3台。vm1上的安装参照单机zookeeper安装。
- 在vm2、vm3上建好目录,并从vm1复制 [root@vm2 ~]# mkdir -p /usr/local/myapp/zookeeper [root@vm3 ~]# mkdir -p /usr/local/myapp/zookeeper [root@vm1 ~]# scp -r /usr/local/myapp/zookeeper/zookeeper-3.6.1/ root@vm2:/usr/local/myapp/zookeeper/ [root@vm1 ~]# scp -r /usr/local/myapp/zookeeper/zookeeper-3.6.1/ root@vm3:/usr/local/myapp/zookeeper/
在单机安装的基础上增加下列配置:
- 每台机器的
/conf/zoo.cfg
文件中增加节点信息 #server.A=B:C:D,A表示服务器编号;B是IP;C是该服务器与leader通信端口;D是leader挂掉后重新选举所用通信端口 server.1=192.168.174.129:2888:3888 server.2=192.168.174.131:2888:3888 server.3=192.168.174.130:2888:3888 - 在每台ZK节点的dataDir目录下新建myid文件 myid配置文件内容是当前服务器的编号,即server.后面的数字,分别在每台机器执行一次echo赋值 echo 1 > /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir/myid #第1台上执行 echo 2 > /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir/myid #第2台上执行 echo 3 > /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir/myid #第3台上执行
- 分别启动每个服务器上的zk服务,注意先要关闭各个机器的防火墙 systemctl stop firewalld zkServer.sh start
- 分别查看每个节点的状态 [root@vm1 ~]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/myapp/zookeeper/zookeeper-3.6.1/bin/…/conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader [root@vm3 ~]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/myapp/zookeeper/zookeeper-3.6.1/bin/…/conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower
至此zookeeper集群模式搭建成功,执行 zkServer.sh stop 命令停掉leader可以发现又会重新选举出leader
kafka安装
按照单机kafka安装配置的步骤先在第一台机器vm1上解压、配置kafka。
- 配置config/server.properties #集群中每个broker的id唯一,一般从0开始 broker.id=0 #kafka对外提供服务监听地址,设置运行kafka的机器IP地址,客户端用此地址连接到kafka listeners=PLAINTEXT://192.168.174.129:9092 #日志目录,多个目录可以用逗号隔开,先创建好目录 log.dirs=/usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka #分区数配置,集群模式下一般设多个分区提高性能 num.partitions=3 #创建topic时的默认副本数,集群模式下一般配置多个副本,提高可用性 default.replication.factor=3 #zk服务器地址配置,集群模式下配置zk的地址列表,逗号分隔 zookeeper.connect=192.168.174.129:2181,192.168.174.130:2181,192.168.174.131:2181
- 复制到其它机器 复制前清空第一台kafka的log.dirs,防止有脏数据影响其它机器 [root@vm1 ~]# rm -rf /usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka/* [root@vm1 ~]# scp -r /usr/local/myapp/kafka/ root@vm2:/usr/local/myapp/ [root@vm1 ~]# scp -r /usr/local/myapp/kafka/ root@vm3:/usr/local/myapp/ 分别修改其它机器的config/server.properties配置 #集群中每个broker的id唯一,一般从0开始 broker.id=1 #kafka对外提供服务的监听地址设置为本机ip listeners=PLAINTEXT://192.168.174.131:9092 #集群中每个broker的id唯一,一般从0开始 broker.id=2 #kafka对外提供服务的监听地址设置为本机ip listeners=PLAINTEXT://192.168.174.130:9092
- 启动kafka集群 先启动zk,再启动kafka 查看zk是否启动:zkServer.sh status [root@vm1 ~]# zkServer.sh start [root@vm2 ~]# zkServer.sh start [root@vm3 ~]# zkServer.sh start [root@vm1 ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & [root@vm1 ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & [root@vm1 ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & 查看zk和Kafka进程是否正常启动: [root@vm1 ~]# jps 5524 QuorumPeerMain 6100 Kafka 6741 Jps
- 验证kafka集群 随机找一台kafka的机器创建topic,在另外的kafka服务器查看集群topic,如果有则集群配置正常 [root@vm1 ~]# kafka-topics.sh –create –zookeeper 192.168.174.129:2181 –replication-factor 2 -partitions 2 –topic kafkatest [root@vm1 ~]# kafka-topics.sh –describe –zookeeper 192.168.174.129:2181 Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 [root@vm2 ~]# kafka-topics.sh –describe –zookeeper 192.168.174.131:2181 Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 [root@vm3 ~]# kafka-topics.sh –describe –zookeeper 192.168.174.130:2181 Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 也可以使用新版的命令: [root@vm1 ~]# kafka-topics.sh –create –topic kafkatest2 –bootstrap-server 192.168.174.131:9092 创建topic时如果没有显式指定分区数和副本数则使用默认值,即配置文件server.properties中的配置。 [root@vm3 ~]# kafka-topics.sh –describe –bootstrap-server 192.168.174.131:9092 Topic: kafkatest2 PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: kafkatest2 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: kafkatest2 Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: kafkatest2 Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 也可以在每一台机器上使用list命令查看kafka集群topic [root@vm1 ~]# kafka-topics.sh –list –bootstrap-server 192.168.174.129:9092 kafkatest kafkatest2 kafkatest3 [root@vm3 ~]# kafka-topics.sh –list –bootstrap-server 192.168.174.130:9092 kafkatest kafkatest2 kafkatest3 [root@vm2 ~]# kafka-topics.sh –list –bootstrap-server 192.168.174.131:9092 kafkatest kafkatest2 kafkatest3 至此,kafka集群安装成功。 注意:关闭时先关闭kafka再关闭zookeeper! kafka-server-stop.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & zkServer.sh stop
kafka监控管理界面
使用web管理页面或仪表盘管理kafka更加方便日常维护。
下载Kafka Eagle
下载可能很慢,耐心等待。
解压到本地指定目录
[root@vm1 ~]# tar -zxvf kafka-eagle-bin-2.0.1.tar.gz
[root@vm1 ~]# cd kafka-eagle-bin-2.0.1/ [root@vm1 kafka-eagle-bin-2.0.1]# tar -zxvf kafka-eagle-web-2.0.1-bin.tar.gz -C /usr/local/myapp/ [root@vm1 ~]# mv /usr/local/myapp/kafka-eagle-web-2.0.1/ /usr/local/myapp/kafka-eagle
环境变量配置
export KE_HOME=/usr/local/myapp/kafka-eagle export PATH= P A T H : PATH: PATH:KE_HOME/bin
source /etc/profile
修改Kafka-Eagle配置文件system-config.properties
代码语言:javascript复制# zookeeper和kafka集群配置
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.174.129:2181,192.168.174.130:2181,192.168.174.131:2181
######################################
# kafka eagle webui port
# web页面访问端口号
######################################
kafka.eagle.webui.port=8048
######################################
# kafka jdbc driver address
# kafka默认使用Centos自带的sqlite数据库,配置下数据库文件存放路径即可
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/usr/local/myapp/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
启动kafka-eagle
[root@vm1 bin]# cd /usr/local/myapp/kafka-eagle/bin
[root@vm1 bin]# ./ke.sh start
[root@vm1 bin]# ./ke.sh status
如果出现错误,请查看日志日志是否出问题
/usr/local/myapp/kafka-eagle/logs
如果出现内存不足导致的错误,可以调小ke.sh中设置的JVM内存占用后再次启动
vim kafka-eagle/bin/ke.sh
export KE_JAVA_OPTS=”-server -Xmx256M -Xms256M -XX:MaxGCPauseMillis=20 -XX: UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80″
如果没问题,则直接登录
http://192.168.174.129:8048
默认用户名:admin
默认密码:123456
- 开启监控趋势图 Kafka Eagle的监控趋势图需要连接kafka的JMX端口,kafka默认不开启JMX。如果需要查看监控趋势图,需要开启kafka jmx端口。同时启用Kafka Eagle监控趋势图,即Kafka Eagle的conf/system-config.properties文件中配置: kafka.eagle.metrics.charts=true 开启kafka jmx端口有两种方式: 1.启动Kafka前先执行export JMX_PORT=xxxx命令 设置临时环境变量 [root@vm1 ~]# export JMX_PORT=9966 [root@vm1 ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & 注意kafka启动完成后删除临时环境变量,因为后面如果在同一台机器上运行kafka生产者脚本将会导致重复开启jmx,报端口占用失败 [root@vm1 ~]# echo
第二种方式是在kafka的bin/kafka-run-class.sh 脚本中加入:JMX_PORT=9966,每个节点依次添加完配置再启动kafka。
Kafka生产消费脚本演示
- 生产和消费 在第一台服务器上向broker0发送消息,在另外的服务器上消费kafka消息 选项 说明: –topic 定义topic名称 –replication-factor 定义副本数 –partitions 定义分区数 –from-beginning:把主题中历史所有的数据都读取出来 创建topic [root@vm1 ~]# kafka-topics.sh –create –topic testtopic01 –bootstrap-server 192.168.174.129:9092 查看topic [root@vm1 ~]# kafka-topics.sh –describe –topic testtopic01 –bootstrap-server 192.168.174.129:9092 Topic: testtopic01 PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: testtopic01 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: testtopic01 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: testtopic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 运行生产者,并输入消息 [root@vm1 ~]# kafka-console-producer.sh –topic testtopic01 –bootstrap-server 192.168.174.129:9092 This is my first message 在另外2台机器上运行消费者,查看接收到的消息 [root@vm2 ~]# kafka-console-consumer.sh –topic testtopic01 –from-beginning –bootstrap-server 192.168.174.131:9092 [root@vm3 ~]# kafka-console-consumer.sh –topic testtopic01 –from-beginning –bootstrap-server 192.168.174.130:9092 ctrl c 退出 消费者、生产者控制台 删除topic [root@vm3 ~]# kafka-topics.sh –list –bootstrap-server 192.168.174.129:9092 [root@vm3 ~]# kafka-topics.sh –delete –topic testtopic01 –bootstrap-server 192.168.174.129:9092 [root@vm3 ~]# kafka-topics.sh –list –bootstrap-server 192.168.174.129:9092 修改topic的分区数 [root@vm3 ~]# kafka-topics.sh –bootstrap-server 192.168.174.129:9092 –alter –topic testtopic01 –partitions 6
- 验证集群容错能力 建一个topic,分区数为1,副本数设置为2。kill掉副本1所在的服务器上的kafka进程,看看kafka生产者和kafka消费者是否正常消费;再kill掉副本2所在的kafka服务试一次。 [root@vm3 ~]# kafka-topics.sh –create –topic testtopic02 –bootstrap-server 192.168.174.130:9092 –replication-factor 2 -partitions 1 [root@vm2 ~]# kafka-topics.sh –describe –topic testtopic02 –bootstrap-server 192.168.174.129:9092 Topic: testtopic02 PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: testtopic02 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Replicas: 1,0表明副本在0号,1号机器,即本文中的vm1、vm2,因此我们开启2个vm3的窗口分别运行生产者和消费者。 生产者发送: [root@vm3 ~]# kafka-console-producer.sh –topic testtopic02 –bootstrap-server 192.168.174.130:9092 hello msg1 hello msg2 消费者接受: [root@vm3 ~]# kafka-console-consumer.sh –topic testtopic02 –from-beginning –bootstrap-server 192.168.174.130:9092 hello msg1 hello msg2 kill掉broker0,再次进行生产消费演示: [root@vm3 ~]# kafka-console-producer.sh –topic testtopic02 –bootstrap-server 192.168.174.130:9092 hello after kill [root@vm3 ~]# kafka-console-consumer.sh –topic testtopic02 –from-beginning –bootstrap-server 192.168.174.130:9092 hello after kill 发现生产消费正常。此时kill掉broker1,再次进行生产消费演示,发现报错: WARN [Producer clientId=console-producer] 1 partitions have leader brokers without a matching listener, including [testtopic02-0] (org.apache.kafka.clients.NetworkClient)
WARN [Consumer clientId=consumer-console-consumer-32492-1, groupId=console-consumer-32492] 1 partitions have leader brokers without a matching listener, including [testtopic02-0] (org.apache.kafka.clients.NetworkClient)
重新启动kafka服务又会恢复!
Kafka服务端配置总结
1.brokerId
kafka集群中broker的唯一标识。一般可以从 1开始递增。broker 在启动时会在ZooKeeper 中的/brokers/ids 路径下创建一个以当前brokerId 为名称的虚节点,broker 的健康状态检查就依赖于此虚节点。当broker 下线时,该虚节点会自动删除,其他broker 节点或客户端通过判断/brokers/ids 路径下是否有此broker的brokerld 节点来确定该broker 的健康状态。broker启动后在日志目录log.dirs下会生成meta.properties 文件。如果server.properties和meta.properties配置的broker.id不一致,将启动失败,抛出异常: InconsistentBrokerldException
2.log.dirs
设置Kafka 日志文件存放的根目录,可以以逗号分隔的方式配置多个目录,如果不配置,默认值为:/tmp/kafka-logs
3.listeners
broker对外提供服务的地址,即客户端连接kafka broker的地址。比如listeners=PLAINTEXT://192.168.174.129:9092,客户端就可以使用192.168.174.129:9092这个地址连接kafka,发送或接受消息。还有一个advertised.listeners配置项,用于IaaS环境,比如云服务器通常配备有多块网卡,即包含私网网卡和公网网卡,此时可以设置advertised.listeners 配置项绑定公网IP 供外部客户端使用,而配置listeners 参数来绑定内网IP 地址供broker间通信使用。
4.zookeeper.connect
设置kafka broker 连接的ZooKeeper 集群的服务地址,ZooKeeper 集群中有多个节点时可以使用逗号分隔。
5.message.max.bytes
指定broker 所能接收消息体的最大大小(如果启用压缩的话就是压缩后的最大大小),单位字节。
6.log.retention.hours
日志文件保存时间,默认7天。
更多参数含义参见官方文档:http://kafka.apache.org/documentation/
kafka生产者客户端API
Kafka Producer发送消息可以采用同步或者异步的方式。
使用异步发送
引入Java客户端依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
核心类
KafkaProducer:生产者对象,用来发送数据
ProducerConfig:设置生产者的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
Producer客户端代码
不带回调函数:KafkaProducer#send(ProducerRecord<K,V>)
代码语言:javascript复制public class MyProducer1 {
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 32);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 20; i ) {
producer.send(new ProducerRecord<String, String>("topic_test", Integer.toString(i), "value:" i));
}
producer.close();
}
}
带回调函数:KafkaProducer#send(ProducerRecord<K,V>, Callback)
代码语言:javascript复制public class MyProducer2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 32);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i ) {
producer.send(new ProducerRecord<String, String>("topic_test", Integer.toString(i), "value:" i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null == exception) {
System.out.println("send success->" metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
回调函数会在producer收到ack时异步调用,该方法有两个参数:RecordMetadata、Exception。这两个参数是互斥的关系,即如果Exception为null,则消息发送成功,此时RecordMetadata必定不为null。消息发送异常时,RecordMetadata为null,而exception不为null。消息发送失败会自动重试,不需在回调函数中手动重试。重试次数由参数retries设定。
KafkaProducer设定参数retries,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数。
使用同步发送
同步是指,一条消息发送后会阻塞当前线程,直至返回ack消息。
有上面异步的例子可以看出,producer的send方法返回对象是Future类型,因此可以通过调用Future对象的get()方法触发同步等待。
因此与异步代码区别只有一处:
代码语言:javascript复制producer.send(new ProducerRecord<String, String>("topic_test", Integer.toString(i), "value:" i)).get();
生产者分区策略
使用分区提高了topic的处理性能,提高了topic的并发性。我们可以将kafka 生产者发送的数据封装成ProducerRecord
对象。ProducerRecord
对象中有partition
属性。每条ProducerRecord
会被发送到特定的partition
中。
在这里插入图片描述
生产者发送消息时的分区原则如下:
(1)构造器中指明 partition 时,直接将指定值作为 partiton值; (2)没有指明partition值但有key的情况下,将key的hash值与topic 的 partition 数进行取余得到 partition 值; (3)既没有指定 partition 值又没有传入 key 值时,第一次调用时随机生成1个整数(后面每次调用在这个整数上自增),并与这个topic的partition数取模得到partition值,即采用round-robin 算法。
数据可靠性
kafka采用ack机制保证数据可靠性。topic的每个partition所在的broker收到producer发送的数据后会向producer发送ack消息。如果producer没有收到ack,将会触发重试机制。那么kafka何时给生产者返回ack响应呢?在partition的副本中,有leader和follower之分,如果需要所有的follower都同步完成才发ack,这时因为某种故障,某个follower迟迟不能与leader进行同步,那么就会一直等下去,直至其同步完成才会发送ack,这样势必影响kafka性能。kafka是如何优化这个问题的呢?
在kafka中,一个主题的分区中所有的副本集合称之为AR(Assigned Replicas)。leader节点维护了一个动态的集合,即in-sync replica set (ISR),指的是和leader保持一定程度同步的follower集合(包括leader节点在内)。与leader副本同步滞后过多的副本组成OSR(Out-of-Sync Replicas)集合,因此AR=ISR OSR。正常情况下所有的follower都应该和leader保持一定程度的同步,即AR=ISR。
kafka的做法是根据用户配置的ack级别来确认何时返回客户端ack消息。
Acks参数用来指定分区中有多少副本收到这条消息之后生产者才能确认这条消息是写入成功的,有3种级别的配置。
acks=1,即默认配置。生产者发送消息后,只要分区的leader副本成功写入消息,就会收到服务器的ack成功响应。如果消息写入leader失败,比如leader已经挂了,正在重新选举,此时生产者客户端会收到错误响应,可以进行重发。如果写入leader成功、follower同步完成之前leader挂了,将会出现消息丢失。所以acks=1是Kafka消息可靠性和吞吐量之间的折中方案,一般也是默认的配置。
acks=0,生产者发送消息之后无需等待任何服务端的响应。如果在消息从发送到写入Kafka 的过程中出现某些异常,导致Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks=0 可以使Kafka达到最大的吞吐量。
acks=-1,和acks=all效果一样。生产者在消息发送之后,需要等待ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的ack响应。在其他配置环境相同的情况下,acks 设置为-1 可以达到最高的数据可靠性。但是如果在follower 同步完成后,broker给生产者客户端发送ack之前,leader发生故障,将会造成数据重复。
Exactly Once语义
将acks级别设置为-1,可以保证producer和kafka server之间不丢失数据,即保证At least once 语义。
对应的,acks级别设置为0,可以保证每条消息只会发送1次,不会重复,即At most once语义。
有些场景下,数据的消费者要求数据既不丢失也不重复,即Exactly Once语义。0.11版本的 Kafka,引入了一项重大特性:幂等性。即不论producer向server发送多少次重复数据,server端只会持久化1条。将producer中的enable.idompotence参数设为true即可开启幂等性。开启幂等性的Producer在初始化的时候会分配一个pid,发往同一个Partition的消息会附带sequence number。Kafka服务端,即broker端会对<pid,partition,seqnumber>做缓存,当具有相同键的消息提交时,broker只会持久化一条。但是PID重启就会变化,并且不同的Partition具有不同的键,所以kafka的幂等性无法保证跨分区、跨会话的Exactly Once。
注意,Kafka的幂等性只是Kafka自身的一种机制,无法保证业务层的幂等。通常我们需要自行实现业务侧的幂等控制。
生产者拦截器
生产者客户端通过实现接口org.apache.kafka.clients.producer.ProducerInterceptor生成一个生产者拦截器。
Kafka Producer会在消息序列化和计算分区之前调用拦截器的onSend()方法,用户可以在此方法中进行消息发送前的业务定制。一般不修改ProducerRecord的topic、key、partition等信息。Kafka Producer会在消息被应答(ack)之前或者消息发送失败时调用拦截器的 onAcknowledgement()方法,此方法在用户设置的异步CallBack()方法之前执行。onAcknowledgement方法的业务逻辑越简单越好,否则会影响发送性能,因为该方法运行在Producer的I/O线程中。
拦截器案例:定义2个拦截器,并使用它们
代码语言:javascript复制/** * 拦截器案例-第1个拦截器在消息体前增加时间戳信息 */
public class ProducerInterceptor1 implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在消息体前加上时间戳
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() "_" record.value(), record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
代码语言:javascript复制/** * 拦截器案例-第2个拦截器统计发送成功和失败的消息 */
public class ProducerInterceptor2 implements ProducerInterceptor<String, String> {
private AtomicInteger successCounter = new AtomicInteger();
private AtomicInteger failCounter = new AtomicInteger();
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter.getAndIncrement();
} else {
failCounter.getAndIncrement();
}
}
@Override
public void close() {
System.out.println("successCounter:" successCounter.get());
System.out.println("failCounter:" failCounter.get());
System.out.println("发送成功率:" successCounter.get() / (failCounter.get() successCounter.get()));
}
@Override
public void configure(Map<String, ?> configs) {
}
}
代码语言:javascript复制/** * 使用自定义生产者拦截器 * Kafka Producer会在消息序列化和计算分区之前调用拦截器的onSend方法,可以在此方法中进行业务定制。一般不修改ProducerRecord的topic、key、partition等信息 * Kafka Producer会在消息被应答(ack)之前或者消息发送失败时调用拦截器的 onAcknowledgement 方法,此方法在用户设置的异步CallBack方法之前执行。 * onAcknowledgement方法的业务逻辑越简单越好,否则会影响发送性能,因为该方法运行在Producer的I/O线程中 */
public class ProducerWithInterceptor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//构建拦截器链
List<String> interceptors = new ArrayList<>();
interceptors.add(ProducerInterceptor1.class.getName());
interceptors.add(ProducerInterceptor2.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i ) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic_test", "hello " i);
kafkaProducer.send(record);
}
//关闭producer,才能触发Interceptor的close方法
kafkaProducer.close();
}
}
kafka消费者客户端API
引入依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
核心类:
KafkaConsumer:消费者对象,用来消费数据
ConsumerConfig:设置消费者的一系列配置参数
KafkaConsumer#poll(Duration):消费者客户端核心方法,即从服务端拉取(poll)消息。后面章节将展示不同位移(offset)提交方式下如何编写消费者客户端API代码。
消费者分区策略
消费者客户端可以指定消费某个主题的特定分区,KafkaConsumer
中的assign(Collection<TopicPartition> partitions)
方法可以指定需要订阅的分区集合。也可以在配置文件中配置partition.assignment.strategy配置项指定自定义分区策略。
默认不指定的情况下,kafka使用内置的分配策略。即Range(默认)和Robin。当然还有一种更为复杂的StickyAssignor分配策略。
- Range Range 分区策略是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假如现在有 10 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6,7,8,9;消费者排序完之后将会是C1-0,C2-0,C3-0。通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。 例如,10/3 = 3 余 1 ,除不尽,那么 消费者 C1-0 便会多消费 1 个分区,最终分区分配结果如下: 消费者 消费分区 C1-0 消费 0,1,2,3 分区 C2-0 消费 4,5,6 分区 C3-0 消费 7,8,9 分区(如果有11 个分区的话,C1-0 将消费0,1,2,3 分区,C2-0 将消费4,5,6,7分区 C3-0 将消费 8,9,10 分区) Range 范围分区的弊端:上面的例子只是针对 1 个 topic 而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。部分消费者明显过载。
- RoundRobin RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hascode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。 轮询分区分为如下两种情况: 1.同一消费组内所有消费者订阅的topic都是相同的 2.同一消费者组内的消费者订阅的消息有不相同的 第1种情况下 RoundRobin 策略的分区分配是均匀的。例如:同一消费者组中,有 3 个消费者C0、C1和C2,都订阅了 2 个主题 t0 和 t1,并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所以分区可以标识为t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终分区分配结果如下: 消费者 消费分区 消费者C0 消费 t0p0 、t1p0 分区 消费者C1 消费 t0p1 、t1p1 分区 消费者C2 消费 t0p2 、t1p2 分区 第2种情况下RoundRobin 在执行分区分配的时候,就不是完全的轮询分配。有可能导致分区分配不均匀。例如:同一个消费者组中,有3个消费者C0、C1和C2,它们共订阅了 3 个主题:t0、t1 和 t2,这 3 个主题分别有 1、2、3 个分区(即:t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2)),所有消费者所订阅的所有分区可以标识为 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,最终分区分配结果如下: 消费者 消费分区 消费者C0 消费 t0p0 分区 消费者C1 消费 t1p0 分区 消费者C2 消费 t1p1、t2p0、t2p1、t2p2 分区 如果优先使用RoundRobin 轮询分区最好是在消费者组里每个消费者订阅的主题都是相同的情况下,一般实际开发中消费者组里的每个消费者确实也是如此,尽量不要一个消费者订阅多个主题。
- 位移(offset) 对于Kafka 中的分区而言,它的每条消息都有唯一的offset ,用来表示消息在分区中对应的位置。对于消费者而言, 它也有一个offset 的概念,消费者使用offset 来表示消费到分区中某个消息所在的位置。消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。Kafka 0.9 版本之前,consumer 默认将offset 保存在Zookeeper 中,新版consumer客户端 默认将offset 保存在Kafka 内置的主题 __consumer_offsets中。通常把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。 如何获取offset?先修改配置文件consumer.properties:exclude.internal.topics=false 使用kafka脚本读取offset: kafka-console-consumer.sh –topic __consumer_offsets –zookeeper localhost:2181 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” –consumer.config config/consumer.properties –from-beginning
消费者位移提交方式
自动提交offset
自动位移提交(commit)的动作是在poll()方法里完成的,每次向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。自动提交消费位移的方式非常简便,免去了复杂的位移提交逻辑,使得应用层代码非常简洁。如果在下一次自动提交消费位移之前,消费者宕机了,那么又得从上一次位移提交的地方重新开始消费,这将导致重复消费。可以减小位移提交的时间间隔来减小消息重复的时间窗口,但是这会使移提交更加频繁。
代码:
代码语言:javascript复制/** * 自动提交offset */
public class MyConsumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
}
}
}
手动提交offset
- 同步方式
/** * 手动同步提交offset */
public class MyConsumer2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交offset,手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
//同步提交,线程会阻塞,直到当前批次offset提交成功
kafkaConsumer.commitSync();
}
}
}
- 异步方式
/** * 手动异步提交offset */
public class MyConsumer3 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交offset,手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
//异步提交,可以带回调函数,线程不会阻塞
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("提交失败:" offsets);
}
}
});
}
}
}
自定义offset
在Kafka中,offset默认存储在broker的内置Topic中,我们可以自定义存储位置。比如为了保证消费和提交偏移量同时成功或失败,我们可以利用数据库事务来实现,把offset存储在Mysql即可。下面的例子仅为示例代码,其中getOffset和commitOffset方法可以根据所选的offset存储系统(比如mysql)自行实现。
代码语言:javascript复制/** * 自定义offset提交 * 在Kafka中,offset默认存储在broker的内置Topic中,我们可以自定义存储位置 * 比如为了保证消费和提交偏移量同时成功或失败,我们可以利用数据库事务来实现,把offset存储在Mysql即可 * 下面的例子仅为示例代码,其中getOffset和commitOffset方法可以根据所选的offset存储系统(比如mysql)自行实现 */
public class MyConsumer4 {
public static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交offset,手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"), new ConsumerRebalanceListener() {
//该方法会在Rebalanced之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//该方法会在Rebalanced之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
//定位到每个分区最近提交的offset位置继续消费
kafkaConsumer.seek(partition, getOffset(partition));
}
}
});
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
});
//提交offset
commitOffset(currentOffset);
}
}
/** * 获取某分区最新的offset * * @param partition * @return */
private static long getOffset(TopicPartition partition) {
return 0;
}
/** * 提交该消费者所有分区的offset * * @param currentOffset */
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
消费者拦截器
Kafka consumer在poll()方法返回之前会先调用拦截器的onConsume()方法,可以在此方法里预先对消息进行定制化操作。Kafka consumer在提交完消费位移之后会调用拦截器的onCommit()方法。
拦截器案例:定义1个消费者拦截器并使用
代码语言:javascript复制/** * 自定义消费者拦截器 * Kafka consumer在poll()方法返回之前会先调用拦截器的onConsume方法,可以在此方法里预先对消息进行定制化操作 * Kafka consumer在提交完消费位移之后会调用拦截器的onCommit方法, * 本拦截器功能: * 对消息的时间戳进行判断,过滤掉不满足时效(过期)的消息 * 消费完成后打印位移信息 */
public class ConsumerInterceptor1 implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
HashMap<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(64);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordsInPartition = records.records(partition);
List<ConsumerRecord<String, String>> filteredRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : recordsInPartition) {
if (System.currentTimeMillis() - record.timestamp() < EXPIRE_INTERVAL) {
filteredRecords.add(record);
}
}
if (!filteredRecords.isEmpty()) {
newRecords.put(partition, filteredRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> {
System.out.println(tp ":" offset.offset());
});
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
代码语言:javascript复制/** * 使用自定义消费者拦截器 */
public class ConsumerWithInterceptor1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//使用自定义消费者拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor1.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, key = %d, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.timestamp(), record.value());
});
}
}
}
代码语言:javascript复制/** * 测试ConsumerWithInterceptor中时间戳过滤是否生效 * 可以在发送消息时手动修改Producer Record的timestamp */
public class Producer4ConsumerWithInterceptor1 {
private static final long EXPIRE_INTERVAL = 10000;
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i ) {
ProducerRecord<String, String> record;
if (i % 2 == 0) {
record = new ProducerRecord<>("topic_test", "hello " i);
} else {
//奇数序号消息时间戳向前调整,测试消费者拦截器是否能过滤掉
record = new ProducerRecord<>("topic_test", null, System.currentTimeMillis() - EXPIRE_INTERVAL, null, "hello " i);
}
kafkaProducer.send(record);
}
//关闭producer,才能触发Interceptor的close方法
kafkaProducer.close();
}
}
kafka事务
详细内容请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
SpringBoot集成kafka
详细内容请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
kafka面试题集锦
完整内容请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
欢迎关注公众号【程猿薇茑】获取各种教程。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181192.html原文链接:https://javaforall.cn