大家好,又见面了,我是你们的朋友全栈君。
一,安装环境与软件版本
linux | centOs6 64 |
---|---|
jdk | jdk-8u191-linux-x64.tar.gz |
zookeeper | zookeeper-3.4.10.tar.gz |
kafuka | kafka_2.11-0.11.0.2 |
二,安装
代码语言:javascript复制##解压
-rwxrw-rw-. 1 root root 42136632 Jun 11 01:55 kafka_2.11-0.11.0.2.tgz
drwxr-xr-x. 12 1001 1001 4096 Jun 11 05:35 zookeeper-3.4.10
[root@localhost module]# tar -xvf kafka_2.11-0.11.0.2.tgz
[root@localhost kafka_2.11-0.11.0.2]# ll
total 56
drwxr-xr-x. 3 root root 4096 Nov 10 2017 bin
drwxr-xr-x. 2 root root 4096 Nov 10 2017 config
drwxr-xr-x. 2 root root 4096 Jun 11 20:09 libs
-rw-r--r--. 1 root root 28824 Nov 10 2017 LICENSE
drwxr-xr-x. 2 root root 4096 Jun 11 20:10 logs
-rw-r--r--. 1 root root 336 Nov 10 2017 NOTICE
drwxr-xr-x. 2 root root 4096 Nov 10 2017 site-docs
##添加日志文件夹
[root@localhost kafka_2.11-0.11.0.2]# mkdir logs
[root@localhost kafka_2.11-0.11.0.2]# ll
#修改配文件
[root@localhost kafka_2.11-0.11.0.2]# cd config/
[root@localhost config]# vim server.properties
broker.id=1 #broker的全局唯一编号,不能重复(我的是跟zk的myid一样)
delete.topic.enable=true
listeners=PLAINTEXT://192.168.8.132:9092
log.dirs=/opt/module/kafka_2.11-0.11.0.2/logs
zookeeper.connect=192.168.8.129:2181,192.168.8.132:2181,192.168.8.133:2181
三,启动和创建分区使用
注:zookeeper集群启动正常的前提下
代码语言:javascript复制#启动
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-server-start.sh config/server.properties &
#关闭
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-server-stop.sh stop
##创建topic
#topic 定义topic名
#replication-factor 定义副本数
#partitions 定义分区数
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 3 --partitions 3 --topic test
Created topic "test".
##查看topic 列表
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
test
##查看详情
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs: MarkedForDeletion:true
Topic: test Partition: 0 Leader: -1 Replicas: 0,1,2 Isr: 2
Topic: test Partition: 1 Leader: -1 Replicas: 1,2,0 Isr: 2
Topic: test Partition: 2 Leader: -1 Replicas: 2,0,1 Isr: 2
##删除topic
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
#如果集群里有某个kafuka没有设置 delete.topic.enable=true ,
#则不会删除,需要全部重新启动后,再删除才可
#删除成功后的标记
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
test - marked for deletion
#再zk里删除注册的节点
rmr /brokers/topics/【topic name】
四,简单使用
代码语言:javascript复制##发送消息(localhost 必须是本机的ip)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicTest
##消费消息(localhost 必须是本机的ip)
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topicTest
#生产
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-console-producer.sh --broker-list 192.168.8.129:9092 --topic test1
>123
>123
>123
>123
>123
>123
>123
>hool^H^H
>holl
>hello
>hello
>
#消费1
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-console-consumer.sh --zookeeper 192.168.8.132:2181 --from-beginning --topic test1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
123
123
123
123
123
123
123
hool
holl
hello
hello
#消费2,中途退出后在进来,前期的消息会乱序
[root@localhost kafka_2.11-0.11.0.2]# bin/kafka-console-consumer.sh --zookeeper 192.168.8.133:2181 --from-beginning --topic test1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
123
123
123
hello
123
123
hool
123
123
holl
hello
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/131623.html原文链接:https://javaforall.cn