搭建Kafka集群( 2.8.0版本)之二

2021-09-08 15:57:04 浏览数 (1)

1、创建Topic

(1)创建topic

代码语言:javascript复制
[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --create --topic demo --bootstrap-server node1:9092
Created topic demo.
[root@node3 kafka-2.8.0]#

(2)查看topic

代码语言:javascript复制
[root@node3 kafka-2.8.0]# bin/kafka-topics.sh  --list --zookeeper node1:2181
__consumer_offsets
demo
[root@node3 kafka-2.8.0]#

(3)查看名称为test的topic消息

代码语言:javascript复制
[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --describe --topic demo --bootstrap-server node1:9092
Topic: demo     TopicId: wdH8eCg_SNqaWi3N1crN8Q PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: demo     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: demo     Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: demo     Partition: 2    Leader: 3       Replicas: 3     Isr: 3
[root@node3 kafka-2.8.0]# 

2、启动生产者,发送消息

Kafka软件包提供了kafka-console-producer.sh脚本,这是一个简易的生产者控制台,供我们测试之用。我们可以通过Java编写一个具有具体业务意义的消息生产者。

代码语言:javascript复制
[root@node3 kafka-2.8.0]# bin/kafka-console-producer.sh --topic demo --bootstrap-server node1:9092
>hello
>This is my second message
>^C[root@node3 kafka-2.8.0]#

其中--bootstrap-server是kafka-console-producer.sh脚本必要参数,指明连接的服务器,形如host1:prot1,host2:prot2

3、启动消费者,消费消息

Kafka软件包提供了kafka-console-consumer.sh脚本,这是一个简易的消费者控制台,供我们测试之用。我们可以通过Java编写一个具有具体业务意义的消息消费者。

(1)在node1节点启动一个消费者

代码语言:javascript复制
[root@node1 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092
hello
This is my second message

同样--bootstrap-server是kafka-console-consumer.sh脚本必要参数,指明连接的服务器,形如host1:prot1,host2:prot2 (2)在node2节点上再启动一个消费者

代码语言:javascript复制
[root@node2 kafka-2.8.0]#  bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092,node2:9092,node3:9092
hello
This is my second message

4、Key/Value消息

默认情况下,所生产的消息是没有key,或者认为所有消息内容都是value值,其可以值为null。

下面测试一下Key/Value消息 (1)启动生产者,启用key解析

代码语言:javascript复制
[root@node1 kafka-2.8.0]# bin/kafka-console-producer.sh --topic demo --bootstrap-server node1:9092 --property parse.key=true
>hello  Kafka
>

--property表示设置消费者相关的配置,其后的parse.key=true表示启用parse.key。

(2)启动消费者

代码语言:javascript复制
[root@node2 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092 --property print.key=true
null    hello
hello   Kafka
null    This is my second message

可以看到之前普通消息的key是null

5、消费者无法收到消息

最近遇到一个问题,重新搭建了一套kafka集群,但是kafka-console-consumer.sh无法收到kafka-console-producer.sh 发送的消息。 经过一番折腾后,问题终于解决了(问题产生的原因尚不明确),解决流程如下:

4.1 停止kafka集群,清空数据

(1)新建脚本文件init_kafka.sh

代码语言:javascript复制
#! /bin/bash
 
echo "------正在停止Kafka集群------"
SERVERS="node1 node2 node3"

stop_kafka() {
    for SERVER in $SERVERS
    do
         ssh $SERVER "/opt/kafka-2.8.0/bin/kafka-server-stop.sh;jps -m"
    done
}
rm_data() {
    for SERVER in $SERVERS
    do
         ssh $SERVER " rm -rf /var/kafka-logs/*;ls /var/kafka-logs/"
    done
}
echo "------停止Kafka集群------"
stop_kafka
echo "------删除Kafka集群数据----"
rm_data

(2)执行过程

代码语言:javascript复制
[root@node1 ~]# chmod  x init_kafka.sh 
[root@node1 ~]# sh init_kafka.sh 
------正在停止Kafka集群------
------停止Kafka集群------
10400 PaloFe
20006 Jps -m
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
23787 Kafka /opt/kafka-2.8.0/config/server.properties
22446 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
6144 Worker --webui-port 8081 spark://node1:7077
5009 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
4762 Jps -m
5918 Kafka /opt/kafka-2.8.0/config/server.properties
4786 Kafka /opt/kafka-2.8.0/config/server.properties
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
3892 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
22940 BrokerBootstrap
4638 Jps -m
------删除Kafka集群数据----
[root@node1 ~]# 
4.2 停止zookeeper集群,清空数据

(1)新建脚本文件init_zk.sh

代码语言:javascript复制
#! /bin/bash
 
echo "------正在停止ZK集群------"
SERVERS="node1 node2 node3"

stop_zk() {
    for SERVER in $SERVERS
    do
            ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh stop;jps -m"
    done
}
rm_data() {
    id=0
    for SERVER in $SERVERS
    do
         let id    
         ssh $SERVER "rm -rf /tpdata/zookeeper/*;echo $id > /tpdata/zookeeper/myid;ls /tpdata/zookeeper/"
    done
}
echo "------停止ZK集群------"
stop_zk
echo "------删除ZK集群数据----"
rm_data

(2)执行过程

代码语言:javascript复制
[root@node1 ~]# chmod  x  init_zk.sh 
[root@node1 ~]# sh init_zk.sh 
------正在停止ZK集群------
------停止ZK集群------
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
10400 PaloFe
30736 Jps -m
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
22446 -- process information unavailable
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
6144 Worker --webui-port 8081 spark://node1:7077
5009 -- process information unavailable
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
10392 Jps -m
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
3892 -- process information unavailable
10215 Jps -m
22940 BrokerBootstrap
------删除ZK集群数据----
myid
myid
myid
[root@node1 ~]# 
4.3 重新启动zookeeper集群

(1)新建zk集群脚本start_zk.sh

代码语言:javascript复制
#! /bin/bash
 
echo "------正在启动ZK集群------"
SERVERS="node1 node2 node3"

start_zk() {
    for SERVER in $SERVERS
    do
            ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh start;jps -m"
    done
}
show_status() {
    for SERVER in $SERVERS
    do
         ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh status"
    done
}
echo "------启动ZK集群------"
start_zk

echo "------等待5秒,查看ZK集群状态----"
sleep 5
show_status

(2)执行过程

代码语言:javascript复制
[root@node1 ~]# chmod  x  start_zk.sh 
[root@node1 ~]# sh start_zk.sh 
------正在启动ZK集群------
------启动ZK集群------
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
10400 PaloFe
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
6651 Jps -m
6574 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
6144 Worker --webui-port 8081 spark://node1:7077
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
14569 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
14605 Jps -m
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
14475 Jps -m
22940 BrokerBootstrap
14429 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
------等待5秒,查看ZK集群状态----
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@node1 ~]# 
4.4 重新启动kafka集群

(1)新建启动kafka集群脚本start_kafka.sh

代码语言:javascript复制
#! /bin/bash
 
echo "------正在启动Kafka集群------"
SERVERS="node1 node2 node3"

start_kafka() {
    for SERVER in $SERVERS
    do
            ssh $SERVER "/opt/kafka-2.8.0/bin/kafka-server-start.sh -daemon  /opt/kafka-2.8.0/config/server.properties;jps -m"
    done
}
start_kafka

(2)执行过程

代码语言:javascript复制
[root@node1 ~]# chmod  x  start_kafka.sh 
[root@node1 ~]# sh start_kafka.sh 
------正在启动Kafka集群------
10400 PaloFe
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
13371 Kafka /opt/kafka-2.8.0/config/server.properties
13372 Jps -m
6574 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
6144 Worker --webui-port 8081 spark://node1:7077
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
14569 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
17979 Kafka /opt/kafka-2.8.0/config/server.properties
17980 Jps -m
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
18133 Kafka /opt/kafka-2.8.0/config/server.properties
18134 Jps -m
22940 BrokerBootstrap
14429 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
[root@node1 ~]#

0 人点赞