kafka集群部署

2024-08-07 14:51:33 浏览数 (2)

集群规划

Kafka 是一个开源的分布式消息队列系统,主要用于处理和传输大量的数据流。通俗来说,它就像一个“邮局”或者“快递公司”,负责在不同的应用程序之间发送和接收信息。

例子:

想象一下一个大型的家庭聚会,家里有很多人,各自有不同的需求和想法。

消息传递:在聚会上,家里的每个人都可以通过一个“公告板”来传递信息,比如“晚餐准备好了”或者“谁能帮我拿饮料”。这个公告板就像 Kafka,帮助不同的人发送和接收信息。

数据流处理:如果有人在聚会上提出一个问题,比如“大家喜欢什么样的音乐?”,其他人可以在公告板上写下他们的意见。家长可以实时看到这些意见,从而决定播放什么音乐。

数据存储:如果某个家庭成员暂时不在聚会现场,他们可以在回来的时候查看公告板,看到所有的消息,而不会错过任何重要的信息。这就像 Kafka 临时存储消息,确保信息不会丢失。

高吞吐量:如果聚会上有很多人同时发言,公告板依然能够快速记录所有的信息,确保每个人的声音都被听到。这类似于 Kafka 高效处理大量消息的能力。

前提工作

我们采用kafka3.3版本(kafka_2.12-3.3.1.tgz)

hadoop102下解压缩

代码语言:shell复制
cd /opt/module

# 解压
tar -zxvf kafka_2.12-3.3.1.tgz
# 重命名
mv kafka_2.12-3.3.1/ kafka

配置环境变量

代码语言:shell复制
# 编辑配置文件
vim /etc/profile

# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=:$KAFKA_HOME/bin:$PATH

# 加载环境变量使其生效
source /etc/profile

搭建

代码语言:shell复制
# 进入到/opt/module/kafka目录,修改配置文件
cd /opt/module/kafka/config

vim server.properties

#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0

#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop102:9092

#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas

#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

分发kafka至hadoop103、hadoop104

代码语言:shell复制
# 远程拷贝
scp -r /opt/module/kafka hadoop103:/opt/module/
scp -r /opt/module/kafka hadoop104:/opt/module/

# 分别登录修改hadoop103、hadoop104上的server.properties配置
# hadoop103 如下:
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop103:9092

# hadoop104 如下:
broker.id=2
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop104:9092

一键启动脚本

代码语言:shell复制
vim start-kafka.sh

#!/bin/bash

for host in hadoop102 hadoop103 hadoop104
do
       ssh $host "source /etc/profile;/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
       echo "$host kafka Server 正在启动......"
done

一键关闭脚本

代码语言:shell复制
vim stop-kafka.sh

#!/bin/bash

for host in hadoop102 hadoop103 hadoop104
do
       ssh $host "source /etc/profile;/opt/module/kafka/bin/kafka-server-stop.sh"
       echo "$host kafka Server 正在关闭......"
done

检查结果

启动成功有以下标志

每台机器jps有kafka进程

主题命令行操作

代码语言:shell复制
# 查看当前服务器中的所有topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --list

# 创建first topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

# 查看first主题的详情
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

# 修改分区数(注意:分区数只能增加,不能减少)
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

 # 再次查看first主题的详情
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

# 删除topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first

生产者命令行操作

代码语言:shell复制
# 发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello world

消费者命令行操作

代码语言:shell复制
# 消费first主题中的数据
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
# 把主题中所有的数据都读取出来(包括历史数据)
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

0 人点赞