集群规划
Kafka 是一个开源的分布式消息队列系统,主要用于处理和传输大量的数据流。通俗来说,它就像一个“邮局”或者“快递公司”,负责在不同的应用程序之间发送和接收信息。
例子:
想象一下一个大型的家庭聚会,家里有很多人,各自有不同的需求和想法。
消息传递:在聚会上,家里的每个人都可以通过一个“公告板”来传递信息,比如“晚餐准备好了”或者“谁能帮我拿饮料”。这个公告板就像 Kafka,帮助不同的人发送和接收信息。
数据流处理:如果有人在聚会上提出一个问题,比如“大家喜欢什么样的音乐?”,其他人可以在公告板上写下他们的意见。家长可以实时看到这些意见,从而决定播放什么音乐。
数据存储:如果某个家庭成员暂时不在聚会现场,他们可以在回来的时候查看公告板,看到所有的消息,而不会错过任何重要的信息。这就像 Kafka 临时存储消息,确保信息不会丢失。
高吞吐量:如果聚会上有很多人同时发言,公告板依然能够快速记录所有的信息,确保每个人的声音都被听到。这类似于 Kafka 高效处理大量消息的能力。
前提工作
我们采用kafka3.3版本(kafka_2.12-3.3.1.tgz)
hadoop102下解压缩
代码语言:shell复制cd /opt/module
# 解压
tar -zxvf kafka_2.12-3.3.1.tgz
# 重命名
mv kafka_2.12-3.3.1/ kafka
配置环境变量
代码语言:shell复制# 编辑配置文件
vim /etc/profile
# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=:$KAFKA_HOME/bin:$PATH
# 加载环境变量使其生效
source /etc/profile
搭建
代码语言:shell复制# 进入到/opt/module/kafka目录,修改配置文件
cd /opt/module/kafka/config
vim server.properties
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop102:9092
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
分发kafka至hadoop103、hadoop104
代码语言:shell复制# 远程拷贝
scp -r /opt/module/kafka hadoop103:/opt/module/
scp -r /opt/module/kafka hadoop104:/opt/module/
# 分别登录修改hadoop103、hadoop104上的server.properties配置
# hadoop103 如下:
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop103:9092
# hadoop104 如下:
broker.id=2
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop104:9092
一键启动脚本
代码语言:shell复制vim start-kafka.sh
#!/bin/bash
for host in hadoop102 hadoop103 hadoop104
do
ssh $host "source /etc/profile;/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
echo "$host kafka Server 正在启动......"
done
一键关闭脚本
代码语言:shell复制vim stop-kafka.sh
#!/bin/bash
for host in hadoop102 hadoop103 hadoop104
do
ssh $host "source /etc/profile;/opt/module/kafka/bin/kafka-server-stop.sh"
echo "$host kafka Server 正在关闭......"
done
检查结果
启动成功有以下标志
每台机器jps有kafka进程
主题命令行操作
代码语言:shell复制# 查看当前服务器中的所有topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --list
# 创建first topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
# 查看first主题的详情
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
# 修改分区数(注意:分区数只能增加,不能减少)
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
# 再次查看first主题的详情
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
# 删除topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
生产者命令行操作
代码语言:shell复制# 发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello world
消费者命令行操作
代码语言:shell复制# 消费first主题中的数据
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
# 把主题中所有的数据都读取出来(包括历史数据)
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first