一、准备
确保服务器上已经搭建完成JDK,zookeeper服务;
如果未搭建完成,请移步参考以下文章:
安装zookeeper: https://blog.csdn.net/xuan_lu/article/details/120474451
安装JDK1.8:https://blog.csdn.net/xuan_lu/article/details/107297710
kafka官网下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.11-2.2.0.tgz
点击下载安装包
二、安装与配置
- 解压缩:
tar -zxvf kafka_2.11-2.2.0.tgz
- 重命名:
mv kafka_2.11-2.2.0 kafka
- 创建**
logs
**文件,用于存储kafka日志: 在kafka安装目录下创建:mkdir kafka-logs /opt/software/kafka/kafka-logs - 修改**
server.properties
**配置文件
- 修改日志存储目录:
log.dirs=/opt/software/kafka/kafka-logs
- 修改或者确定zookeeper地址和端口:
zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
- 修改broker.id
broker.id=1
三、配置文件详解
代码语言:javascript复制#一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=1
#listeners=PLAINTEXT://:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS
# broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
# broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
# socket的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=102400
# socket的接收缓冲区 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
# socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
socket.request.max.bytes=104857600
#kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/opt/software/kafka/kafka-logs
# 每个topic的分区个数,更多的partition会产生更多的segment file
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
#log.flush.interval.messages=10000
# 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
#log.flush.interval.ms=1000
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
log.retention.hours=168
#log.retention.bytes=1073741824
# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1073741824
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000
# Zookeeper quorum设置。如果有多个使用逗号分割 例如 ip:prot,ip:prot,ip:prot
zookeeper.connect=localhost:2181
# 连接zk的超时时间
zookeeper.connection.timeout.ms=6000
# ZooKeeper集群中leader和follower之间的同步实际
group.initial.rebalance.delay.ms=0
四、运行
1.启动zookeeper服务
由于小编服务器上已经启动过zookeeper服务,故不需要重新执行启动命令;
如果服务器zookeeper服务未启动,则在kafka目录下执行以下命令:
使用安装包中的脚本启动单节点Zookeeper 实例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
注意:以下命令,小编均选择在kafka安装根目录下执行;
2.启动kafka服务
以下方式任选其一
- kafka目录启动
使用kafka-server-start.sh 启动kafka 服务:
bin/kafka-server-start.sh config/server.properties
这种命令执行并不是后台进程运行,故使用以下命令bin/kafka-server-start.sh -daemon config/server.properties
- 命令需要切换到kafka的bin目录下,
./kafka-server-start.sh -daemon /opt/software/kafka//config/server.properties
3.创建topic
使用kafka-topics.sh 创建单分区单副本的topic test:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4.查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
5.产生消息:kafka-console-producer.sh
使用kafka-console-producer.sh 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
使用Ctrl C退出生成消息;
6.消费消息:kafka-console-consumer.sh
使用kafka-console-consumer.sh 接收消息并在终端打印
(这里用删除线标识,并不是代表命名错误,低版本仍然可以适用!!!)bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
这里需要注意上面这行命令试用于低版本的kafka,否则会报错,小编安装的kafka版本高,所以报错,故换成以下命令即可:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
7.查看描述 Topic 信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
说明:
第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。
8.删除topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
参考资源:
1.在CentOS 7上安装Kafka
https://cloud.tencent.com/developer/article/1388439