1、创建Topic
(1)创建topic
代码语言:javascript复制[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --create --topic demo --bootstrap-server node1:9092
Created topic demo.
[root@node3 kafka-2.8.0]#
(2)查看topic
代码语言:javascript复制[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --list --zookeeper node1:2181
__consumer_offsets
demo
[root@node3 kafka-2.8.0]#
(3)查看名称为test的topic消息
代码语言:javascript复制[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --describe --topic demo --bootstrap-server node1:9092
Topic: demo TopicId: wdH8eCg_SNqaWi3N1crN8Q PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: demo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: demo Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: demo Partition: 2 Leader: 3 Replicas: 3 Isr: 3
[root@node3 kafka-2.8.0]#
2、启动生产者,发送消息
Kafka软件包提供了kafka-console-producer.sh
脚本,这是一个简易的生产者控制台,供我们测试之用。我们可以通过Java编写一个具有具体业务意义的消息生产者。
[root@node3 kafka-2.8.0]# bin/kafka-console-producer.sh --topic demo --bootstrap-server node1:9092
>hello
>This is my second message
>^C[root@node3 kafka-2.8.0]#
其中--bootstrap-server
是kafka-console-producer.sh脚本必要参数,指明连接的服务器,形如host1:prot1,host2:prot2
3、启动消费者,消费消息
Kafka软件包提供了kafka-console-consumer.sh
脚本,这是一个简易的消费者控制台,供我们测试之用。我们可以通过Java编写一个具有具体业务意义的消息消费者。
(1)在node1节点启动一个消费者
代码语言:javascript复制[root@node1 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092
hello
This is my second message
同样--bootstrap-server
是kafka-console-consumer.sh脚本必要参数,指明连接的服务器,形如host1:prot1,host2:prot2
(2)在node2节点上再启动一个消费者
[root@node2 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092,node2:9092,node3:9092
hello
This is my second message
4、Key/Value消息
默认情况下,所生产的消息是没有key,或者认为所有消息内容都是value值,其可以值为null。
下面测试一下Key/Value消息 (1)启动生产者,启用key解析
代码语言:javascript复制[root@node1 kafka-2.8.0]# bin/kafka-console-producer.sh --topic demo --bootstrap-server node1:9092 --property parse.key=true
>hello Kafka
>
--property
表示设置消费者相关的配置,其后的parse.key=true
表示启用parse.key。
(2)启动消费者
代码语言:javascript复制[root@node2 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092 --property print.key=true
null hello
hello Kafka
null This is my second message
可以看到之前普通消息的key是null
5、消费者无法收到消息
最近遇到一个问题,重新搭建了一套kafka集群,但是kafka-console-consumer.sh无法收到kafka-console-producer.sh 发送的消息。 经过一番折腾后,问题终于解决了(问题产生的原因尚不明确),解决流程如下:
4.1 停止kafka集群,清空数据
(1)新建脚本文件init_kafka.sh
代码语言:javascript复制#! /bin/bash
echo "------正在停止Kafka集群------"
SERVERS="node1 node2 node3"
stop_kafka() {
for SERVER in $SERVERS
do
ssh $SERVER "/opt/kafka-2.8.0/bin/kafka-server-stop.sh;jps -m"
done
}
rm_data() {
for SERVER in $SERVERS
do
ssh $SERVER " rm -rf /var/kafka-logs/*;ls /var/kafka-logs/"
done
}
echo "------停止Kafka集群------"
stop_kafka
echo "------删除Kafka集群数据----"
rm_data
(2)执行过程
代码语言:javascript复制[root@node1 ~]# chmod x init_kafka.sh
[root@node1 ~]# sh init_kafka.sh
------正在停止Kafka集群------
------停止Kafka集群------
10400 PaloFe
20006 Jps -m
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
23787 Kafka /opt/kafka-2.8.0/config/server.properties
22446 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
6144 Worker --webui-port 8081 spark://node1:7077
5009 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
4762 Jps -m
5918 Kafka /opt/kafka-2.8.0/config/server.properties
4786 Kafka /opt/kafka-2.8.0/config/server.properties
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
3892 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
22940 BrokerBootstrap
4638 Jps -m
------删除Kafka集群数据----
[root@node1 ~]#
4.2 停止zookeeper集群,清空数据
(1)新建脚本文件init_zk.sh
代码语言:javascript复制#! /bin/bash
echo "------正在停止ZK集群------"
SERVERS="node1 node2 node3"
stop_zk() {
for SERVER in $SERVERS
do
ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh stop;jps -m"
done
}
rm_data() {
id=0
for SERVER in $SERVERS
do
let id
ssh $SERVER "rm -rf /tpdata/zookeeper/*;echo $id > /tpdata/zookeeper/myid;ls /tpdata/zookeeper/"
done
}
echo "------停止ZK集群------"
stop_zk
echo "------删除ZK集群数据----"
rm_data
(2)执行过程
代码语言:javascript复制[root@node1 ~]# chmod x init_zk.sh
[root@node1 ~]# sh init_zk.sh
------正在停止ZK集群------
------停止ZK集群------
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
10400 PaloFe
30736 Jps -m
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
22446 -- process information unavailable
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
6144 Worker --webui-port 8081 spark://node1:7077
5009 -- process information unavailable
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
10392 Jps -m
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
3892 -- process information unavailable
10215 Jps -m
22940 BrokerBootstrap
------删除ZK集群数据----
myid
myid
myid
[root@node1 ~]#
4.3 重新启动zookeeper集群
(1)新建zk集群脚本start_zk.sh
代码语言:javascript复制#! /bin/bash
echo "------正在启动ZK集群------"
SERVERS="node1 node2 node3"
start_zk() {
for SERVER in $SERVERS
do
ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh start;jps -m"
done
}
show_status() {
for SERVER in $SERVERS
do
ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh status"
done
}
echo "------启动ZK集群------"
start_zk
echo "------等待5秒,查看ZK集群状态----"
sleep 5
show_status
(2)执行过程
代码语言:javascript复制[root@node1 ~]# chmod x start_zk.sh
[root@node1 ~]# sh start_zk.sh
------正在启动ZK集群------
------启动ZK集群------
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
10400 PaloFe
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
6651 Jps -m
6574 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
6144 Worker --webui-port 8081 spark://node1:7077
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
14569 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
14605 Jps -m
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
14475 Jps -m
22940 BrokerBootstrap
14429 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
------等待5秒,查看ZK集群状态----
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@node1 ~]#
4.4 重新启动kafka集群
(1)新建启动kafka集群脚本start_kafka.sh
代码语言:javascript复制#! /bin/bash
echo "------正在启动Kafka集群------"
SERVERS="node1 node2 node3"
start_kafka() {
for SERVER in $SERVERS
do
ssh $SERVER "/opt/kafka-2.8.0/bin/kafka-server-start.sh -daemon /opt/kafka-2.8.0/config/server.properties;jps -m"
done
}
start_kafka
(2)执行过程
代码语言:javascript复制[root@node1 ~]# chmod x start_kafka.sh
[root@node1 ~]# sh start_kafka.sh
------正在启动Kafka集群------
10400 PaloFe
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
13371 Kafka /opt/kafka-2.8.0/config/server.properties
13372 Jps -m
6574 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
6144 Worker --webui-port 8081 spark://node1:7077
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
14569 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
17979 Kafka /opt/kafka-2.8.0/config/server.properties
17980 Jps -m
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
18133 Kafka /opt/kafka-2.8.0/config/server.properties
18134 Jps -m
22940 BrokerBootstrap
14429 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
[root@node1 ~]#