微系列:5、在Centos系统中,搭建Kafka集群

2023-01-09 18:53:55 浏览数 (1)

一、准备基础环境

1、需安装Java环境并配置环境变量 https://jdk.java.net/java-se-ri/11

2、下载kafka_2.13-2.6.1压缩包 http://kafka.apache.org/downloads

3、配置防火墙,开放相关端口

二、修改配置文件

进入kafka目录下的config文件夹下,修改配置文件server.properties内容为:

代码语言:javascript复制
# broker的id号,同一个集群中每个节点设置为不同的id
broker.id=0
# 监听协议及地址
listeners=SASL_PLAINTEXT://192.168.0.1:9092
# 认证鉴权
security.inter.broker.protocol=SASL_PLAINTEXT 
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 数据目录
log.dirs=/home/kafka/kafka-logs
# zookeeper地址
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181

三、启动服务脚本

修改启动脚本,配置认证的用户名密码

编辑bin目录中kafka-server-start.sh,加入以下启动参数

创建topic、producer、consumer的脚本都需要加入以下参数

代码语言:javascript复制
if [  "x$KAFKA_OPTS" ]; then
 export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.13-2.6.1/config/kafka_server_jaas.conf"
fi

kafka_server_jaas.conf文件内容如下

代码语言:javascript复制
KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="chexiaotong"
 user_admin="chexiaotong"
 user_kafka="chexiaotong";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="chexiaotong";
};


然后进入bin目录,使用如下脚本启动Kafka节点
./kafka-server-start.sh -daemon /home/kafka/kafka_2.13-2.6.1/config/server.properties

其它节点配置启动类似

四、设置开机启动

创建文件/etc/rc.d/init.d/kafka,写入以下内容

代码语言:javascript复制
#!/bin/bash

export JAVA_HOME=/usr/local/jdk-11
export PATH=$JAVA_HOME/bin:$PATH

#
# kafka  ---  this script is used to start and stop kafka
#
# chkconfig:   - 80 12
# description:  kafka
# processname: kafka

STARTEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-start.sh
STOPEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-stop.sh
CONF=/home/kafka/kafka_2.13-2.6.1/config/server.properties
case $1 in
          start) 
              $STARTEXEC -daemon $CONF
              ;;
          stop)
              $STOPEXEC
              ;;
          status)
              jps
              ;;
          restart)
              $STOPEXEC
              $STARTEXEC -daemon $CONF
              ;;
          *)
              echo "require start|stop|status|restart"
              ;;
esac

添加可执行权限

代码语言:javascript复制
chmod  x /etc/rc.d/init.d/kafka

注册为系统服务

代码语言:javascript复制
chkconfig --add kafka

添加开机自启动

代码语言:javascript复制
chkconfig kafka on

其他命令使用

创建topic

代码语言:javascript复制
./kafka-topics.sh --create --bootstrap-server 192.168.0.1:9092 --replication-factor 1 --partitions 1 --topic test

发布消息

代码语言:javascript复制
./kafka-console-producer.sh --producer.config ../config/producer.properties --bootstrap-server 192.168.0.1:9092 --topic test

消费消息

代码语言:javascript复制
./kafka-console-consumer.sh --consumer.config ../config/consumer.properties --bootstrap-server 192.168.0.1:9092 --topic test --from-beginning

五、Kafka其他相关概念

kafka实例(broker),或者说服务器节点。

消息(event records也叫record或者message),一条消息有key,value,timestamp和一些可选的headers。

生产者(producer),发布消息。

消费者(comsumer),订阅消息。

主题(topic),用于消息归类。概念上类似文件系统的文件夹,消息是这个文件夹中的文件,或者可以理解为类似于别的消息系统的队列。

分区(partition),主题是分区的,一个主题可以有多个分区,可以分布在不同的broker中,kafka保证单个分区的消息是有序的。

副本(replica),为了容错和高可用,每个主题可以被复制。复制的对象是分区,也就是说分区可以被复制为多个,统称为副本,副本数量可配置。

日志(log) ,存储消息的地方,分区的具体实现,日志持久化到文件系统。

主题与分区示意图

基本脚本使用

启动

代码语言:javascript复制
bin/zookeeper-server-start.sh config/zookeeper.properties

创建主题,创建一个名字叫test,单个分区,1个副本的主题

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看主题

代码语言:javascript复制
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

发送消息

代码语言:javascript复制
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

消费消息

代码语言:javascript复制
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

0 人点赞