kafka集群搭建

2022-09-26 17:48:38 浏览数 (1)

基于docker-compose的kafka集群搭建

使用docker-compose搭建kafka集群,解析一些参数含义及列出搭建过程的一些坑。

1. docker-compose文件

直接上yml文件

代码语言:javascript复制
version: '3.3'
networks:
  kafka:
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper-kafka
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - /usr/local/etc/kafka/data/zookeeper/data:/data
      - /usr/local/etc/kafka/data/zookeeper/datalog:/datalog
    networks:
      - kafka
    restart: unless-stopped

  kafka1:
    image: wurstmeister/kafka
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - 9091:9091
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091
      KAFKA_LOG_DIRS: /data/kafka-log
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9091
      BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9092,kafka3:9093
    volumes:
      - /usr/local/etc/kafka/data/kafka1:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka
    restart: unless-stopped
     
  kafka2:
    image: wurstmeister/kafka
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1 
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_LOG_DIRS: /data/kafka-log
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9092,kafka3:9093
    volumes:
      - /usr/local/etc/kafka/data/kafka2:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka
    restart: unless-stopped

  kafka3:
    image: wurstmeister/kafka
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - 9093:9093
    environment:
      KAFKA_BROKER_ID: 2 
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_LOG_DIRS: /data/kafka-log
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9093
      BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9092,kafka3:9093
    volumes:
      - /usr/local/etc/kafka/data/kafka3:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka
    restart: unless-stopped
      
  kafka_manager:
    image: hlebalbau/kafka-manager:latest
    container_name: kafka_manager
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: zookeeper:2181
      APPLICATION_SECRET: doper
    networks:
      - kafka
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3

2. 各模块解析

2.1. zookeeper

​ 这里zookeeper使用的是zookeeper官方提供的镜像,与大多数教程中的wurstmeister/zookeeper是一样的,在使用时可以挂载卷实现数据持久化,具体挂载的目录在官方的README中有,如下:

2.2. kafka

kafka的参数配置在镜像README中也有说明,简单来说就是先从kafka官方文档中找到要配置的参数,然后在docker-compose.yml文件中配置时只需要将配置项的名字全部换成大写,然后.换成_,再加上KAFKA_前缀几个。举个例子,修改broker.id,则需要在配置文件中设置KAFKA_BROKER_ID=XXX即可。


  1. 参数说明

  • KAFKA_BROKER_ID: 配置broker.id,每个brokerid必须唯一
  • KAFKA_NUM_PARTITIONS: 设置topic的分区数
  • KAFKA_DEFAULT_REPLICATION_FACTOR: topic的复制系数,也就是消息副本数
  • KAFKA_ZOOKEEPER_CONNECT: zookeeper的连接地址,这里注意的是加了/kafka后缀,是为了方便管理,把集群所有的信息都放入zookeeper/kafka目录下。
  • KAFKA_LISTENERS: 监听器,指定以什么协议及哪个主机名和端口来访问kafka服务。这里设置了监听所有网卡,listeners 解决的是kafka监听来自于哪个网卡的请求。
  • KAFKA_ADVERTISED_LISTENERS: broker真正要注册进zookeeper的监听信息,及broker对外开放的端口。
  • KAFKA_LOG_DIRS: 日志位置
  • BOOTSTRAP_SERVERSkafka集群各个服务器的地址信息

  1. listenersadvertised.listeners的区别

reference

  1. https://segmentfault.com/a/1190000020715650
  2. https://blog.csdn.net/weixin_38251332/article/details/105638535

在部署时对内外网暴露端口需要做区分时使用,原文:

listeners: INSIDE://172.17.0.10:9092,OUTSIDE://172.17.0.10:9094 advertised_listeners: INSIDE://172.17.0.10:9092,OUTSIDE://<公网 ip>:端口 kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT" kafka_inter_broker_listener_name: "INSIDE" advertised_listeners 监听器会注册在 zookeeper 中; 当我们对 172.17.0.10:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INSIDE 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口; 同理,当我们对 <公网 ip>:端口 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 OUTSIDE 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 172.17.0.10:9094; 总结:advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners

broker会把advertised_listeners中的信息注册进zookeeper,而客户端建立连接时会去找zookeeper中注册的advertised_listeners,然后根据找到的对应监视器去listeners中找对应的ip和端口进行访问。

因此这里可以有内外网分流的应用,具体见reference


  1. volumne挂载
代码语言:javascript复制
volumes:
      - /usr/local/etc/kafka/data/kafka3:/kafka		# 挂载kafka使用过程中产生的数据
      - /var/run/docker.sock:/var/run/docker.sock	# 挂载宿主机的docker.sock

kafka-docker中给出的docker-compose.yml例子中就有挂载docker.sock

这里为什么需要挂载docker.sock?

reference: https://blog.csdn.net/boling_cavalry/article/details/92846483

首先需要明白docker.sock作用

实际上docker是由clientserver组成,通过docker --version可以查看:

代码语言:javascript复制
 ⚡ root@doper  /home/doper  docker version
Client:
 Version:           20.10.15
 API version:       1.41
 Go version:        go1.18.1
 Git commit:        fd82621d35
 Built:             Thu May  5 23:16:45 2022
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server:
 Engine:
  Version:          20.10.15
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.18.1
  Git commit:       4433bf67ba
  Built:            Thu May  5 23:16:31 2022
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          v1.6.4
  GitCommit:        212e8b6fa2f44b9c21b2798135fc6fb7c53efc16.m
 runc:
  Version:          1.1.2
  GitCommit:        
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

我们发送的docker命令其实都是客户端发送请求到docker daemon服务,docker daemon再返回相关的命令结果即可。

docker daemon默认监听的就是/var/run/docker.sockdaemon通过这个socket和其他进程通信,所以客户端只要将消息发送到这个socket即可实现与daemon通信。具体可见api文档

注意,这里和reference不同的是因为curl版本高于7.50,这个版本的curl在使用时需要提供的URL必须包括hostname. 具体见-> Can cURL send requests to sockets?

eg:

代码语言:javascript复制
 ⚡ root@doper  /home/doper  curl --unix-socket /var/run/docker.sock http://localhost/containers/json
[{"Id":"f2cce65f7b4752396843....	# 一大串的json信息

那么在kafka容器中就可以通过docker命令来得到相关的容器信息,在官方github中的start-kafka.sh脚本中也确实看到了其使用了docker port命令

有了socket,那在容器中有dockerclient可供使用吗?答案是有的,在Dockerfile其已经安装了docker

2.3. kafka_manager

用来管理kafka集群的,在容器启动后可以通过localhost:9000访问可视化界面。

注意创建时由于上面docker-compose.yml中已经设置将所有的kafka集群信息放在/kafka目录下,所以这里也要加上/kafka后缀

但这里有一个坑,就是在创建集群后会遇到如下错误

代码语言:javascript复制
Yikes! KeeperErrorCode = Unimplemented for /kafka-manager/mutex Try again.

解决方法:

reference: https://github.com/yahoo/CMAK/issues/731

代码语言:javascript复制
➜ docker exec -it zookeeper bash
root@98747a9eac65:/zookeeper-3.4.14# ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 2] ls /kafka-manager
[configs, deleteClusters, clusters]
[zk: localhost:2181(CONNECTED) 3] create /kafka-manager/mutex ""
Created /kafka-manager/mutex
[zk: localhost:2181(CONNECTED) 5] create /kafka-manager/mutex/locks ""
Created /kafka-manager/mutex/locks
[zk: localhost:2181(CONNECTED) 6] create /kafka-manager/mutex/leases ""
Created /kafka-manager/mutex/leases

0 人点赞