一、准备基础环境
1、需安装Java环境并配置环境变量 https://jdk.java.net/java-se-ri/11
2、下载kafka_2.13-2.6.1压缩包 http://kafka.apache.org/downloads
3、配置防火墙,开放相关端口
二、修改配置文件
进入kafka目录下的config文件夹下,修改配置文件server.properties内容为:
代码语言:javascript复制# broker的id号,同一个集群中每个节点设置为不同的id
broker.id=0
# 监听协议及地址
listeners=SASL_PLAINTEXT://192.168.0.1:9092
# 认证鉴权
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 数据目录
log.dirs=/home/kafka/kafka-logs
# zookeeper地址
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
三、启动服务脚本
修改启动脚本,配置认证的用户名密码
编辑bin目录中kafka-server-start.sh,加入以下启动参数
创建topic、producer、consumer的脚本都需要加入以下参数
代码语言:javascript复制if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.13-2.6.1/config/kafka_server_jaas.conf"
fi
kafka_server_jaas.conf文件内容如下
代码语言:javascript复制KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="chexiaotong"
user_admin="chexiaotong"
user_kafka="chexiaotong";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="chexiaotong";
};
然后进入bin目录,使用如下脚本启动Kafka节点
./kafka-server-start.sh -daemon /home/kafka/kafka_2.13-2.6.1/config/server.properties
其它节点配置启动类似
四、设置开机启动
创建文件/etc/rc.d/init.d/kafka,写入以下内容
代码语言:javascript复制#!/bin/bash
export JAVA_HOME=/usr/local/jdk-11
export PATH=$JAVA_HOME/bin:$PATH
#
# kafka --- this script is used to start and stop kafka
#
# chkconfig: - 80 12
# description: kafka
# processname: kafka
STARTEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-start.sh
STOPEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-stop.sh
CONF=/home/kafka/kafka_2.13-2.6.1/config/server.properties
case $1 in
start)
$STARTEXEC -daemon $CONF
;;
stop)
$STOPEXEC
;;
status)
jps
;;
restart)
$STOPEXEC
$STARTEXEC -daemon $CONF
;;
*)
echo "require start|stop|status|restart"
;;
esac
添加可执行权限
代码语言:javascript复制chmod x /etc/rc.d/init.d/kafka
注册为系统服务
代码语言:javascript复制chkconfig --add kafka
添加开机自启动
代码语言:javascript复制chkconfig kafka on
其他命令使用
创建topic
代码语言:javascript复制./kafka-topics.sh --create --bootstrap-server 192.168.0.1:9092 --replication-factor 1 --partitions 1 --topic test
发布消息
代码语言:javascript复制./kafka-console-producer.sh --producer.config ../config/producer.properties --bootstrap-server 192.168.0.1:9092 --topic test
消费消息
代码语言:javascript复制./kafka-console-consumer.sh --consumer.config ../config/consumer.properties --bootstrap-server 192.168.0.1:9092 --topic test --from-beginning
五、Kafka其他相关概念
kafka实例(broker),或者说服务器节点。
消息(event records也叫record或者message),一条消息有key,value,timestamp和一些可选的headers。
生产者(producer),发布消息。
消费者(comsumer),订阅消息。
主题(topic),用于消息归类。概念上类似文件系统的文件夹,消息是这个文件夹中的文件,或者可以理解为类似于别的消息系统的队列。
分区(partition),主题是分区的,一个主题可以有多个分区,可以分布在不同的broker中,kafka保证单个分区的消息是有序的。
副本(replica),为了容错和高可用,每个主题可以被复制。复制的对象是分区,也就是说分区可以被复制为多个,统称为副本,副本数量可配置。
日志(log) ,存储消息的地方,分区的具体实现,日志持久化到文件系统。
主题与分区示意图
基本脚本使用
启动
代码语言:javascript复制bin/zookeeper-server-start.sh config/zookeeper.properties
创建主题,创建一个名字叫test,单个分区,1个副本的主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看主题
代码语言:javascript复制bin/kafka-topics.sh --list --bootstrap-server localhost:9092
发送消息
代码语言:javascript复制bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
消费消息
代码语言:javascript复制bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning