前言
https://kafka.apache.org/downloads
单机环境搭建
安装zookeeper
kafka依赖zookeeper,安装包内已内置 使用内置的可以跳过该步骤
也可自己单独下载
https://zookeeper.apache.org/releases.html#download
安装kafka
代码语言:javascript复制wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
cd kafka_2.12-2.8.1
添加环境变量
名称 | 路径 |
---|---|
KAFKA_HOME | D:Toolsbigdatakafka_2.12-2.8.1 |
Path | %KAFKA_HOME%bin |
启动ZK
启动ZK
代码语言:javascript复制%KAFKA_HOME%/bin/windows/zookeeper-server-start.bat %KAFKA_HOME%/config/zookeeper.properties
进入
代码语言:javascript复制%KAFKA_HOME%/bin/windows/zookeeper-shell.bat localhost
输入命令
代码语言:javascript复制#查看zk的根目录kafka相关节点
ls /
#查看kafka节点
ls /brokers
ls /brokers/topics
常用命令
- 显示根目录下文件:
ls /
使用 ls 命令来查看当前 ZooKeeper 中所包含的内容 - 显示根目录下文件:
ls2 /
查看当前节点数据并能看到更新次数等数据 - 创建文件,并设置初始内容:
create /zk "test"
创建一个新的 znode节点“ zk ”以及与它关联的字符串 - 获取文件内容:
get /zk
确认 znode 是否包含我们所创建的字符串 - 修改文件内容:
set /zk "zkbak"
对 zk 所关联的字符串进行设置 - 删除文件:
delete /zk
将刚才创建的 znode 删除 - 退出客户端:
quit
- 帮助命令:
help
修改配置
修改配置文件 config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://localhost:9092
#kafka的消息存储文件
log.dir=/kafka/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=localhost:2181
启动Kafka
Linux
启动脚本语法:
代码语言:javascript复制kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。 (注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)
查看主机名称 hostname 查看主机名 hostname -i:查看本机对应的IP 修改主机名称:
代码语言:javascript复制vi /etc/hostname
启动kafka,运行日志在logs目录的server.log文件里
代码语言:javascript复制bin/kafka-server-start.sh -daemon config/server.properties #后台启动,不会打印日志到控制台
或者用
代码语言:javascript复制bin/kafka-server-start.sh config/server.properties &
停止kafka
代码语言:javascript复制bin/kafka-server-stop.sh
Win
启动kafka,运行日志在logs目录的server.log文件里
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-server-start.bat %KAFKA_HOME%/config/server.properties
停止kafka
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-server-stop.bat
命令测试消息
Linux
1、创建主题
代码语言:javascript复制bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2、列出所有主题
代码语言:javascript复制bin/kafka-topics.sh --list --zookeeper localhost:2181
3、发送消息
代码语言:javascript复制bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
4、消费消息,默认是消费最新的消息
代码语言:javascript复制bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
5、创建多个分区主题
代码语言:javascript复制bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1
扩容分区
代码语言:javascript复制bin/kafka-topics.sh -alter --partitions 3 --zookeeper localhost:2181 --topic test1
6、查看topic情况
代码语言:javascript复制bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
代码语言:javascript复制bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
Win
1、创建主题
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2、列出所有主题
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-topics.bat --list --zookeeper localhost:2181
3、发送消息
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test
4、消费消息,默认是消费最新的消息
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
5、创建多个分区主题
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1
扩容分区
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-topics.bat -alter --partitions 3 --zookeeper localhost:2181 --topic test1
6、查看topic情况
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
代码语言:javascript复制%KAFKA_HOME%/bin/windows/kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test
集群的搭建
集群服务器
zookeeper 1台:192.168.10.10
kafka 2台: 192.168.10.11
和 192.168.10.12
修改配置文件
在192.168.10.11服务器上面
代码语言:javascript复制vi config/server.properties
内容如下
代码语言:javascript复制#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.10.11:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.10.10:2181
在192.168.10.12服务器上面
代码语言:javascript复制vi config/server.properties
内容如下
代码语言:javascript复制#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.10.12:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.10.10:2181
分别启动两台kafka
代码语言:javascript复制bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
测试
创建一个新的topic,副本数设置为3,分区数设置为2
代码语言:javascript复制bin/kafka-topics.sh --create --zookeeper 192.168.10.10:2181 --replication-factor 2 --partitions 2 --topic my-topic
查看topic信息
代码语言:javascript复制bin/kafka-topics.sh --describe --zookeeper 192.168.10.10:2181 --topic my-topic
向my-topic主题发送消息
代码语言:javascript复制bin/kafka-console-producer.sh --broker-list 192.168.10.11:9092,192.168.10.12:9092 --topic my-topic
消费消息
代码语言:javascript复制bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.11:9092,192.168.10.12:9092 --from-beginning --topic my-topic