大家好,又见面了,我是你们的朋友全栈君。
概览
1.上传解压 2.修改配置文件 3.分发到其他节点下 4.启动 5.测试 6.注意
准备
安装好Zookeeper JDK版本:1.8.0_141 Kafka版本:kafka_2.12-1.1.0 工具:Xshell 5,Xftp 5
1.上传解压
首先在master(随意一台)的主机上的/usr下创建kafka文件夹作为安装路径
代码语言:javascript复制[root@master ~]# cd /usr/
[root@master usr]# ls
bin etc flume games hadoop hbase hive include java lib lib64 libexec local sbin share sqoop src tmp zookeeper
[root@master usr]# mkdir kafka
然后利用Xftp将压缩包上传到/usr/kafka下并解压
代码语言:javascript复制[root@master usr]# cd kafka/
代码语言:javascript复制[root@master kafka]# ls
kafka_2.12-1.1.0.tgz
[root@master kafka]# tar -zxf kafka_2.12-1.1.0.tgz
2.修改配置文件
修改 /usr/kafka/kafka_2.12-1.1.0/config/server.properties
代码语言:javascript复制[root@master kafka]# cd kafka_2.12-1.1.0/config/
[root@master config]# vim server.properties
只修改broker.id和zookeeper就行 将
修改为
broker.id每台主机上都不一样
将
修改为
保存退出
这是比较详细的配置(其实只要按照上面的更改即可)
代码语言:javascript复制#broker的全局唯一编号,不能重复
broker.id=01
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka消息存放的路径
log.dirs=/home/servers-kafka/logs/kafka
#topic在当前broker上的分片个数
num.partitions=2
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#滚动生成新的segment文件的最大时间
log.roll.hours=168
#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
#周期性检查文件大小的时间
log.retention.check.interval.ms=300000
#日志清理是否打开
log.cleaner.enable=true
#broker需要使用zookeeper保存meta数据
zookeeper.connect=master:2181,slave1:2181,slave2:2181
#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000
#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000
#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producerconnection to localhost:9092 unsuccessful 错误!
host.name=master
3.分发到其他节点下
代码语言:javascript复制[root@master config]# scp -r /usr/kafka/ root@slave1:/usr/
[root@master config]# scp -r /usr/kafka/ root@slave2:/usr/
然后分别在slave1和slave2上更改broker id
代码语言:javascript复制vim /usr/kafka/kafka_2.12-1.1.0/config/server.properties
将broker id改为02和03
4.启动
确保防火墙关闭
首先全部启动Zookeeper,参考Zookeeper安装中的启动
再全部启动kafka
代码语言:javascript复制# 由于没配置环境变量,所以需要进入安装目录下启动
cd /usr/kafka/kafka_2.12-1.1.0
# 后台启动,最后加&
./bin/kafka-server-start.sh -daemon config/server.properties &
查看jps
5.测试
在master上创建topic-test
代码语言:javascript复制[root@master kafka_2.12-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3 --topic test
Created topic "test".
在master,slave1,2上查看已创建的topic列表
代码语言:javascript复制./bin/kafka-topics.sh --list --zookeeper localhost:2181
在master上启动生产者
代码语言:javascript复制[root@master kafka_2.12-1.1.0]# ./bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic test
随意输入一些内容
在其他节点上启动控制台消费者
代码语言:javascript复制./bin/kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --from-beginning --topic test
会出现
--from-beginning
如果去掉不会出现消费者启动之前的消息
测试:在生产者输入内容,消费者查看消息
删除主题:
代码语言:javascript复制./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
6.注意
如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
你可以通过命令:./bin/kafka-topics –zookeeper 【zookeeper server】 –list 来查看所有topic 此时你若想真正删除它,可以如下操作: (1)登录zookeeper客户端:命令:./bin/zookeeper-client (2)找到topic所在的目录:ls /brokers/topics (3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。 另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】, 如果你删除了此处的topic,那么marked for deletion 标记消失 zookeeper 的config中也有有关topic的信息: ls /config/topics/【topic name】暂时不知道有什么用
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/129932.html原文链接:https://javaforall.cn