K8S部署Kafka集群 - 部署笔记

2021-03-03 11:03:29 浏览数 (1)

一、基础说明

Kafka和zookeeper是两种典型的有状态的应用集群服务。首先kafka和zookeeper都需要存储盘来保存有状态信息;其次kafka和zookeeper每一个实例都需要有对应的实例Id (Kafka需broker.id, zookeeper需要my.id) 来作为集群内部每个成员的标识,集群内节点之间进行内部通信时需要用到这些标识。

对于这类服务的部署,需要解决两个大的问题:一个是状态保存,另一个是集群管理 (多服务实例管理)。kubernetes中提的StatefulSet方便了有状态集群服务在上的部署和管理。通常来说,通过下面三个手段来实现有状态集群服务的部署:

  • 通过Init Container来做集群的初始化工 作。
  • 通过Headless Service来维持集群成员的稳定关系。
  • 通过Persistent Volume和Persistent Volume Claim提供网络存储来持久化数据。

因此,在K8S集群里面部署类似kafka、zookeeper这种有状态的服务,不能使用Deployment,必须使用StatefulSet来部署,有状态简单来说就是需要持久化数据,比如日志、数据库数据、服务状态等。

StatefulSet 应用场景:

  • 稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
  • 稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
  • 有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依次进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
  • 有序收缩,有序删除(即从N-1到0)

StatefulSet组成:

  • 用于定义网络标志(DNS domain)的Headless Service
  • 用于创建PersistentVolumes的volumeClaimTemplates
  • 定义具体应用的StatefulSet

StatefulSet中每个Pod的DNS格式为:

statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中:

  • statefulSetName为StatefulSet的名字
  • 0..N-1为Pod所在的序号,从0开始到N-1
  • serviceName为Headless Service的名字
  • namespace为服务所在的namespace,Headless Servic和StatefulSet必须在相同的namespace
  • svc.cluster.local为K8S的Cluster Domain集群根域

二、部署过程记录(NAS存储)

这里使用K8S搭建一个三节点的kafka容器集群,因为kafka集群需要用到存储,所以需要准备持久卷(Persistent Volume) 简称就是PV。

1. 配置StatefulSet的动态持久化存储

1)使用阿里云NAS存储

阿里云平台创建的NAS存储地址:1*********-beijing.nas.aliyuncs.com

首先在NAS文件系统里创建zk和kafka的子目录。

挂载NAS存储的子目录必须是真实存在的,否则挂载失败,报错不能访问子目录。

代码语言:javascript复制
创建方法:将NAS存储挂载到任意一个node节点,创建NAS存储的子目录,然后再卸载。

[root@k8s-node04 ~]# ls /mnt/
[root@k8s-node04 ~]# mkdir /mnt/test
[root@k8s-node04 ~]# mount -t nfs -o vers=3,nolock,proto=tcp,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport 1*********-beijing.nas.aliyuncs.com:/ /mnt/test/
[root@k8s-node04 ~]# cd /mnt/test/
[root@k8s-node04 test]# mkdir kafka
[root@k8s-node04 test]# mkdir zk
[root@k8s-node04 test]# ll
total 2
drwxr-xr-x 2 root root 4096 Jan 24 22:53 kafka
drwxr-xr-x 2 root root 4096 Jan 24 22:53 zk
 
[root@k8s-node04 ~]# df -h|grep mnt
1*********-beijing.nas.aliyuncs.com:/   10P     0   10P   0% /mnt/test
[root@k8s-node04 ~]# umount /mnt/test
[root@k8s-node04 ~]# rm -rf /mnt/test
[root@k8s-node04 ~]# ls /mnt/

2)创建nfs的rbac

代码语言:javascript复制
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka_zk
[root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka_zk

[root@k8s-master01 kafka_zk]# vim nfs-rbac.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-provisioner
  namespace: wiseco
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
   name: nfs-provisioner-runner
   namespace: wiseco
rules:
   -  apiGroups: [""]
      resources: ["persistentvolumes"]
      verbs: ["get", "list", "watch", "create", "delete"]
   -  apiGroups: [""]
      resources: ["persistentvolumeclaims"]
      verbs: ["get", "list", "watch", "update"]
   -  apiGroups: ["storage.k8s.io"]
      resources: ["storageclasses"]
      verbs: ["get", "list", "watch"]
   -  apiGroups: [""]
      resources: ["events"]
      verbs: ["watch", "create", "update", "patch"]
   -  apiGroups: [""]
      resources: ["services", "endpoints"]
      verbs: ["get","create","list", "watch","update"]
   -  apiGroups: ["extensions"]
      resources: ["podsecuritypolicies"]
      resourceNames: ["nfs-provisioner"]
      verbs: ["use"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-provisioner
    namespace: wiseco
roleRef:
  kind: ClusterRole
  name: nfs-provisioner-runner
  apiGroup: rbac.authorization.k8s.io

创建并查看

代码语言:javascript复制
[root@k8s-master01 kafka_zk]# kubectl apply -f nfs-rbac.yaml
serviceaccount/nfs-provisioner created
clusterrole.rbac.authorization.k8s.io/nfs-provisioner-runner created
clusterrolebinding.rbac.authorization.k8s.io/run-nfs-provisioner created

[root@k8s-master01 kafka_zk]# kubectl get sa -n wiseco|grep nfs
nfs-provisioner                1         14s

[root@k8s-master01 kafka_zk]# kubectl get clusterrole -n wiseco|grep nfs
nfs-provisioner-runner                                                 2021-01-04T13:03:55Z

[root@k8s-master01 kafka_zk]# kubectl get clusterrolebinding -n wiseco|grep nfs
run-nfs-provisioner                                    ClusterRole/nfs-provisioner-runner                                                 36s

3)创建zk集群的storageclass

代码语言:javascript复制
[root@k8s-master01 kafka_zk]# mkdir zk
[root@k8s-master01 kafka_zk]# cd zk
[root@k8s-master01 zk]# pwd
/opt/k8s/k8s_project/kafka_zk/zk

[root@k8s-master01 zk]# cat zk-nfs-class.yaml
apiVersion: storage.k8s.io/v1beta1
kind: StorageClass
metadata:
  name: zk-nfs-storage
  namespace: wiseco
mountOptions:
- vers=4.0
- nolock,tcp,noresvport
provisioner: zk/nfs
reclaimPolicy: Retain

创建并查看

代码语言:javascript复制
[root@k8s-master01 zk]# kubectl apply -f zk-nfs-class.yaml
storageclass.storage.k8s.io/zk-nfs-storage created

[root@k8s-master01 zk]# kubectl get sc -n wiseco
NAME             PROVISIONER   RECLAIMPOLICY   VOLUMEBINDINGMODE   ALLOWVOLUMEEXPANSION   AGE
zk-nfs-storage   zk/nfs        Retain          Immediate           false                  28s

4)创建kafka集群的storageclass

代码语言:javascript复制
[root@k8s-master01 zk]# mkdir ../kafka
[root@k8s-master01 zk]# cd ../kafka
[root@k8s-master01 kafka]# pwd
/opt/k8s/k8s_project/kafka_zk/kafka

[root@k8s-master01 kafka]# cat kafka-nfs-class.yaml
apiVersion: storage.k8s.io/v1beta1
kind: StorageClass
metadata:
  name: kafka-nfs-storage
  namespace: wiseco
mountOptions:
- vers=4.0
- nolock,tcp,noresvport
provisioner: kafka/nfs
reclaimPolicy: Retain

创建并查看

代码语言:javascript复制
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs-class.yaml
storageclass.storage.k8s.io/kafka-nfs-storage created
[root@k8s-master01 kafka]# kubectl get sc -n wiseco
NAME                PROVISIONER   RECLAIMPOLICY   VOLUMEBINDINGMODE   ALLOWVOLUMEEXPANSION   AGE
kafka-nfs-storage   kafka/nfs     Retain          Immediate           false                  4s
zk-nfs-storage      zk/nfs        Retain          Immediate           false                  12m

5)创建zk集群的nfs-client-provisioner

代码语言:javascript复制
[root@k8s-master01 zk]# pwd
/opt/k8s/k8s-project/kafka_zk/zk
[root@k8s-master01 zk]# cat zk-nfs.yml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zk-nfs-client-provisioner
  namespace: wiseco
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zk-nfs-client-provisioner
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: zk-nfs-client-provisioner
    spec:
      serviceAccount: nfs-provisioner
      containers:
        - name: zk-nfs-client-provisioner
          image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: nfs-client-root
              mountPath:  /persistentvolumes
          env:
            - name: PROVISIONER_NAME
              value: zk/nfs
            - name: NFS_SERVER
              value: 1*********-beijing.nas.aliyuncs.com
            - name: NFS_PATH
              value: /zk
      volumes:
        - name: nfs-client-root
          nfs:
            server: 1*********-beijing.nas.aliyuncs.com
            path: /zk

创建并查看

代码语言:javascript复制
[root@k8s-master01 zk]# kubectl apply -f zk-nfs.yml
deployment.apps/zk-nfs-client-provisioner created

[root@k8s-master01 zk]# kubectl get pods -n wiseco|grep nfs
zk-nfs-client-provisioner-bd8d65798-qrz87   1/1     Running   0          39s

6)创建kafka集群的nfs-client-provisioner

代码语言:javascript复制
[root@k8s-master01 kafka]# pwd
/opt/k8s/k8s-project/kafka_zk/kafka
[root@k8s-master01 kafka]# cat kafka-nfs.yml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-nfs-client-provisioner
  namespace: wiseco
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-nfs-client-provisioner
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: kafka-nfs-client-provisioner
    spec:
      serviceAccount: nfs-provisioner
      containers:
        - name: kafka-nfs-client-provisioner
          image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: nfs-client-root
              mountPath:  /persistentvolumes
          env:
            - name: PROVISIONER_NAME
              value: kafka/nfs
            - name: NFS_SERVER
              value: 1*********-beijing.nas.aliyuncs.com
            - name: NFS_PATH
              value: /kafka
      volumes:
        - name: nfs-client-root
          nfs:
            server: 1*********-beijing.nas.aliyuncs.com
            path: /kafka

创建并查看

代码语言:javascript复制
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs.yml
deployment.apps/kafka-nfs-client-provisioner created

[root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep nfs
kafka-nfs-client-provisioner-6747dc587c-8qjn7   1/1     Running   0          16s
zk-nfs-client-provisioner-bd8d65798-cdf6h       1/1     Running   0          47s

2. 创建ZK集群

代码语言:javascript复制
[root@k8s-master01 zk]# pwd
/opt/k8s/k8s_project/kafka_zk/zk
[root@k8s-master01 zk]# ls
zk-nfs-class.yaml  zk-nfs.yml

[root@k8s-master01 zk]# vim zk.yaml
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: zk-hs
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: server
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
---
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: zk-cs
  labels:
    app: zk
spec:
  type: NodePort
  ports:
  - port: 2181
    targetPort: 2181
    name: client
    nodePort: 32181
  selector:
    app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  namespace: wiseco
  name: zk-pdb
spec:
  selector:
    matchLabels:
      app: zk
  maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: wiseco
  name: zok
spec:
  serviceName: zk-hs
  replicas: 3
  selector:
    matchLabels:
      app: zk
  template:
    metadata:
      labels:
        app: zk
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - zk
              topologyKey: "kubernetes.io/hostname"
      containers:
      - name: kubernetes-zookeeper
        imagePullPolicy: Always
        image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
        resources:
          requests:
            memory: "1024Mi"
            cpu: "500m"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        command:
        - sh
        - -c
        - "start-zookeeper 
          --servers=3 
          --data_dir=/var/lib/zookeeper/data 
          --data_log_dir=/var/lib/zookeeper/data/log 
          --conf_dir=/opt/zookeeper/conf 
          --client_port=2181 
          --election_port=3888 
          --server_port=2888 
          --tick_time=2000 
          --init_limit=10 
          --sync_limit=5 
          --heap=512M 
          --max_client_cnxns=60 
          --snap_retain_count=3 
          --purge_interval=12 
          --max_session_timeout=40000 
          --min_session_timeout=4000 
          --log_level=INFO"
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/zookeeper
  volumeClaimTemplates:
  - metadata:
      name: datadir
      annotations:
        volume.beta.kubernetes.io/storage-class: "zk-nfs-storage"
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi

创建并查看

代码语言:javascript复制
[root@k8s-master01 zk]# kubectl apply -f zk.yaml
service/zk-hs created
service/zk-cs created
poddisruptionbudget.policy/zk-pdb created
statefulset.apps/zok created

查看创建的zk集群信息

代码语言:javascript复制
查看zk集群pod
[root@k8s-master01 zk]# kubectl get pods -n wiseco|grep zok
zok-0                                           1/1     Running   0          2m1s
zok-1                                           1/1     Running   0          88s
zok-2                                           1/1     Running   0          51s

查看zk集群的svc
[root@k8s-master01 zk]# kubectl get svc -n wiseco|grep zk
zk-cs           NodePort    10.254.223.248   <none>        2181:32181/TCP               2m24s
zk-hs           ClusterIP   None             <none>        2888/TCP,3888/TCP            2m25s

查看zk集群的pv和pvc
其中:
PV是集群级别的,查询时可以不需要跟"-n 命名空间"
PVC是namespace命名空间级别的,查询时需要跟"-n 命名空间"
[root@k8s-master01 zk]# kubectl get pv -n wiseco|grep zok
pvc-2bace45f-4567-4751-a7ee-233c7034bb09   10Gi       RWX            Delete           Bound    wiseco/datadir-zok-0   zk-nfs-storage            2m59s
pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e   10Gi       RWX            Delete           Bound    wiseco/datadir-zok-1   zk-nfs-storage            2m24s
pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219   10Gi       RWX            Delete           Bound    wiseco/datadir-zok-2   zk-nfs-storage            107s

[root@k8s-master01 zk]# kubectl get pvc -n wiseco|grep zok
datadir-zok-0   Bound    pvc-2bace45f-4567-4751-a7ee-233c7034bb09   10Gi       RWX            zk-nfs-storage   3m2s
datadir-zok-1   Bound    pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e   10Gi       RWX            zk-nfs-storage   2m29s
datadir-zok-2   Bound    pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219   10Gi       RWX            zk-nfs-storage   112s

到NFS服务器端,查看zk集群的持久化共享目录

可以发现,NFS的持久化目录名称组成:namespace名称-PVC名称-PV名称

只要PVC和PC不删除,这个持久化目录名称就不会变。

代码语言:javascript复制
[root@k8s-harbor01 ~]# cd /data/storage/k8s/zk/
[root@k8s-harbor01 zk]# ll
total 0
drwxrwxrwx 3 root root 18 Jan  4 22:22 wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09
drwxrwxrwx 3 root root 18 Jan  4 22:23 wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e
drwxrwxrwx 3 root root 18 Jan  4 22:23 wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219
[root@k8s-harbor01 zk]# ls *
wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09:
data

wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e:
data

wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219:
data
[root@k8s-harbor01 zk]# ls */*
wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data:
log  myid  version-2

wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data:
log  myid  version-2

wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data:
log  myid  version-2

[root@k8s-harbor01 zk]# cat wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data/myid
1
[root@k8s-harbor01 zk]# cat wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data/myid
2
[root@k8s-harbor01 zk]# cat wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data/myid
3

查看zk集群主从关系

代码语言:javascript复制
[root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep zok
zok-0                                           1/1     Running   0          17h
zok-1                                           1/1     Running   0          17h
zok-2                                           1/1     Running   0          17h

[root@k8s-master01 kafka]# kubectl exec -ti zok-0 -n wiseco -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: follower

[root@k8s-master01 kafka]# kubectl exec -ti zok-1 -n wiseco -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: leader

[root@k8s-master01 kafka]# kubectl exec -ti zok-2 -n wiseco -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: follower

验证zk集群连接

代码语言:javascript复制
[root@k8s-master01 kafka]# kubectl get svc -n wiseco|grep zk
zk-cs           NodePort    10.254.223.248   <none>        2181:32181/TCP               17h
zk-hs           ClusterIP   None             <none>        2888/TCP,3888/TCP            17h

[root@k8s-master01 ~]# kubectl exec -ti zok-1 -n wiseco -- zkCli.sh -server zk-cs:2181
Connecting to zk-cs:2181
2021-01-05 08:16:10,169 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2021-01-05 08:16:10,173 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=zok-1.zk-hs.wiseco.svc.cluster.local
2021-01-05 08:16:10,173 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_131
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper:
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2021-01-05 08:16:10,175 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2021-01-05 08:16:10,176 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=4.4.243-1.el7.elrepo.x86_64
2021-01-05 08:16:10,176 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
2021-01-05 08:16:10,176 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
2021-01-05 08:16:10,176 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/
2021-01-05 08:16:10,177 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=zk-cs:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@22d8cfe0
Welcome to ZooKeeper!
JLine support is enabled
2021-01-05 08:16:10,201 [myid:] - INFO  [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-05 08:16:10,261 [myid:] - INFO  [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@876] - Socket connection established to zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, initiating session
[zk: zk-cs:2181(CONNECTING) 0] 2021-01-05 08:16:10,316 [myid:] - INFO  [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, sessionid = 0x376cdc83254000a, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null       #到这里,按Enter键

[zk: zk-cs:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: zk-cs:2181(CONNECTED) 1]

3. 创建KAFKA集群

这里要求能在外部访问到Kafka集群,需要配置三个外网地址,或者一个外网地址 三个端口,依次代理对应后端三个Kafka的Pod容器实例;外网连接Kafka,需要配置使用advertised.listeners监听器;

  • advertised_listeners 监听器是对外暴露的服务端口,kafka组件之间通讯用的是 listeners;
  • advertised_listeners 监听器会注册在 zookeeper 中;
  • 当我们通过对"<公网ip>:端口"请求建立连接,kafka服务器会通过 zookeeper 中注册的监听器,找到定义的公网监听器(默认监听器是PLAINTEXT、也可以自定义),然后通过listeners中找到对应的"通讯ip和端口";

1)kafka 2.12-2.5.0版本的Dockerfile镜像制作

代码语言:javascript复制
FROM 192.168.10.10/wiseco/jdk1.8.0_192
RUN rm -f /etc/localtime 
&& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime 
&& echo "Asia/Shanghai" > /etc/timezone

ENV LANG en_US.UTF-8
ENV KAFKA_DATA_DIR /var/lib/kafka/data
ENV JAVA_HOME /usr/java/jdk1.8.0_192
ENV KAFKA_HOME /opt/kafka
ENV PATH /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/kafka/bin

WORKDIR /opt
RUN yum install -y wget 
&& wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz 
&& tar -zvxf kafka_2.12-2.5.0.tgz 
&& ln -s /opt/kafka_2.12-2.5.0 /opt/kafka 
&& rm -rf kafka_2.12-2.5.0.tgz 
&& mkdir -p /var/lib/kafka/data

CMD ["/bin/bash"]

制作镜像并上传到Harbor参考

代码语言:javascript复制
# docker build -t 192.168.10.10/wiseco/kafka:v2.12-2.5.0 .
# docker push 192.168.10.10/wiseco/kafka:v2.12-2.5.0

2)创建kafka集群

代码语言:javascript复制
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka/
[root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka/

kafka1.yaml 文件内容:

代码语言:javascript复制
[root@k8s-master01 kafka]# vim kafka1.yaml
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: kafka-hs1
  labels:
    app: kafka1
spec:
  ports:
  - port: 1099
    name: jmx1
  clusterIP: None
  selector:
    app: kafka1
---
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: kafka-cs1
  labels:
    app: kafka1
spec:
  type: NodePort
  ports:
  - port: 9092
    targetPort: 9092
    name: client1
    nodePort: 32092
  selector:
    app: kafka1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: wiseco
  name: kafoka1
spec:
  serviceName: kafka-hs1
  replicas: 1
  selector:
    matchLabels:
      app: kafka1
  template:
    metadata:
      labels:
        app: kafka1
    spec:
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0
        ports:
        - containerPort: 9092
          name: client1
        - containerPort: 1099
          name: jmx1
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=0 
          --override advertised.listeners=PLAINTEXT://156.56.46.37:32092 
          --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 
          --override log.dirs=/var/lib/kafka 
          --override auto.create.topics.enable=true 
          --override auto.leader.rebalance.enable=true 
          --override background.threads=10 
          --override compression.type=producer 
          --override delete.topic.enable=true 
          --override leader.imbalance.check.interval.seconds=300 
          --override leader.imbalance.per.broker.percentage=10 
          --override log.flush.interval.messages=9223372036854775807 
          --override log.flush.offset.checkpoint.interval.ms=60000 
          --override log.flush.scheduler.interval.ms=9223372036854775807 
          --override log.retention.bytes=-1 
          --override log.retention.hours=168 
          --override log.roll.hours=168 
          --override log.roll.jitter.hours=0 
          --override log.segment.bytes=1073741824 
          --override log.segment.delete.delay.ms=60000 
          --override message.max.bytes=1000012 
          --override min.insync.replicas=1 
          --override num.io.threads=8 
          --override num.network.threads=3 
          --override num.recovery.threads.per.data.dir=1 
          --override num.replica.fetchers=1 
          --override offset.metadata.max.bytes=4096 
          --override offsets.commit.required.acks=-1 
          --override offsets.commit.timeout.ms=5000 
          --override offsets.load.buffer.size=5242880 
          --override offsets.retention.check.interval.ms=600000 
          --override offsets.retention.minutes=1440 
          --override offsets.topic.compression.codec=0 
          --override offsets.topic.num.partitions=50 
          --override offsets.topic.replication.factor=3 
          --override offsets.topic.segment.bytes=104857600 
          --override queued.max.requests=500 
          --override quota.consumer.default=9223372036854775807 
          --override quota.producer.default=9223372036854775807 
          --override replica.fetch.min.bytes=1 
          --override replica.fetch.wait.max.ms=500 
          --override replica.high.watermark.checkpoint.interval.ms=5000 
          --override replica.lag.time.max.ms=10000 
          --override replica.socket.receive.buffer.bytes=65536 
          --override replica.socket.timeout.ms=30000 
          --override request.timeout.ms=30000 
          --override socket.receive.buffer.bytes=102400 
          --override socket.request.max.bytes=104857600 
          --override socket.send.buffer.bytes=102400 
          --override unclean.leader.election.enable=true 
          --override zookeeper.session.timeout.ms=6000 
          --override zookeeper.set.acl=false 
          --override broker.id.generation.enable=true 
          --override connections.max.idle.ms=600000 
          --override controlled.shutdown.enable=true 
          --override controlled.shutdown.max.retries=3 
          --override controlled.shutdown.retry.backoff.ms=5000 
          --override controller.socket.timeout.ms=30000 
          --override default.replication.factor=1 
          --override fetch.purgatory.purge.interval.requests=1000 
          --override group.max.session.timeout.ms=300000 
          --override group.min.session.timeout.ms=6000 
          --override log.cleaner.backoff.ms=15000 
          --override log.cleaner.dedupe.buffer.size=134217728 
          --override log.cleaner.delete.retention.ms=86400000 
          --override log.cleaner.enable=true 
          --override log.cleaner.io.buffer.load.factor=0.9 
          --override log.cleaner.io.buffer.size=524288 
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 
          --override log.cleaner.min.cleanable.ratio=0.5 
          --override log.cleaner.min.compaction.lag.ms=0 
          --override log.cleaner.threads=1 
          --override log.cleanup.policy=delete 
          --override log.index.interval.bytes=4096 
          --override log.index.size.max.bytes=10485760 
          --override log.message.timestamp.difference.max.ms=9223372036854775807 
          --override log.message.timestamp.type=CreateTime 
          --override log.preallocate=false 
          --override log.retention.check.interval.ms=300000 
          --override max.connections.per.ip=2147483647 
          --override num.partitions=1 
          --override producer.purgatory.purge.interval.requests=1000 
          --override replica.fetch.backoff.ms=1000 
          --override replica.fetch.max.bytes=1048576 
          --override replica.fetch.response.max.bytes=10485760 
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: kafkadatadir
          mountPath: /var/lib/kafka
        lifecycle:
          postStart:
            exec:
              command: ["/bin/sh","-c","touch /tmp/health"]
        livenessProbe:
          exec:
            command: ["test","-e","/tmp/health"]
          initialDelaySeconds: 5
          timeoutSeconds: 5
          periodSeconds: 10
        readinessProbe:
          tcpSocket:
            port: client1
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20
  volumeClaimTemplates:
  - metadata:
      name: kafkadatadir
      annotations:
        volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi

kafka2.yaml 文件内容:

代码语言:javascript复制
[root@k8s-master01 kafka]# vim kafka2.yaml
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: kafka-hs2
  labels:
    app: kafka2
spec:
  ports:
  - port: 1099
    name: jmx2
  clusterIP: None
  selector:
    app: kafka2
---
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: kafka-cs2
  labels:
    app: kafka2
spec:
  type: NodePort
  ports:
  - port: 9092
    targetPort: 9092
    name: client2
    nodePort: 32093
  selector:
    app: kafka2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: wiseco
  name: kafoka2
spec:
  serviceName: kafka-hs2
  replicas: 1
  selector:
    matchLabels:
      app: kafka2
  template:
    metadata:
      labels:
        app: kafka2
    spec:
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0
        ports:
        - containerPort: 9092
          name: client2
        - containerPort: 1099
          name: jmx2
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=1 
          --override advertised.listeners=PLAINTEXT://156.56.46.37:32093 
          --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 
          --override log.dirs=/var/lib/kafka 
          --override auto.create.topics.enable=true 
          --override auto.leader.rebalance.enable=true 
          --override background.threads=10 
          --override compression.type=producer 
          --override delete.topic.enable=true 
          --override leader.imbalance.check.interval.seconds=300 
          --override leader.imbalance.per.broker.percentage=10 
          --override log.flush.interval.messages=9223372036854775807 
          --override log.flush.offset.checkpoint.interval.ms=60000 
          --override log.flush.scheduler.interval.ms=9223372036854775807 
          --override log.retention.bytes=-1 
          --override log.retention.hours=168 
          --override log.roll.hours=168 
          --override log.roll.jitter.hours=0 
          --override log.segment.bytes=1073741824 
          --override log.segment.delete.delay.ms=60000 
          --override message.max.bytes=1000012 
          --override min.insync.replicas=1 
          --override num.io.threads=8 
          --override num.network.threads=3 
          --override num.recovery.threads.per.data.dir=1 
          --override num.replica.fetchers=1 
          --override offset.metadata.max.bytes=4096 
          --override offsets.commit.required.acks=-1 
          --override offsets.commit.timeout.ms=5000 
          --override offsets.load.buffer.size=5242880 
          --override offsets.retention.check.interval.ms=600000 
          --override offsets.retention.minutes=1440 
          --override offsets.topic.compression.codec=0 
          --override offsets.topic.num.partitions=50 
          --override offsets.topic.replication.factor=3 
          --override offsets.topic.segment.bytes=104857600 
          --override queued.max.requests=500 
          --override quota.consumer.default=9223372036854775807 
          --override quota.producer.default=9223372036854775807 
          --override replica.fetch.min.bytes=1 
          --override replica.fetch.wait.max.ms=500 
          --override replica.high.watermark.checkpoint.interval.ms=5000 
          --override replica.lag.time.max.ms=10000 
          --override replica.socket.receive.buffer.bytes=65536 
          --override replica.socket.timeout.ms=30000 
          --override request.timeout.ms=30000 
          --override socket.receive.buffer.bytes=102400 
          --override socket.request.max.bytes=104857600 
          --override socket.send.buffer.bytes=102400 
          --override unclean.leader.election.enable=true 
          --override zookeeper.session.timeout.ms=6000 
          --override zookeeper.set.acl=false 
          --override broker.id.generation.enable=true 
          --override connections.max.idle.ms=600000 
          --override controlled.shutdown.enable=true 
          --override controlled.shutdown.max.retries=3 
          --override controlled.shutdown.retry.backoff.ms=5000 
          --override controller.socket.timeout.ms=30000 
          --override default.replication.factor=1 
          --override fetch.purgatory.purge.interval.requests=1000 
          --override group.max.session.timeout.ms=300000 
          --override group.min.session.timeout.ms=6000 
          --override log.cleaner.backoff.ms=15000 
          --override log.cleaner.dedupe.buffer.size=134217728 
          --override log.cleaner.delete.retention.ms=86400000 
          --override log.cleaner.enable=true 
          --override log.cleaner.io.buffer.load.factor=0.9 
          --override log.cleaner.io.buffer.size=524288 
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 
          --override log.cleaner.min.cleanable.ratio=0.5 
          --override log.cleaner.min.compaction.lag.ms=0 
          --override log.cleaner.threads=1 
          --override log.cleanup.policy=delete 
          --override log.index.interval.bytes=4096 
          --override log.index.size.max.bytes=10485760 
          --override log.message.timestamp.difference.max.ms=9223372036854775807 
          --override log.message.timestamp.type=CreateTime 
          --override log.preallocate=false 
          --override log.retention.check.interval.ms=300000 
          --override max.connections.per.ip=2147483647 
          --override num.partitions=1 
          --override producer.purgatory.purge.interval.requests=1000 
          --override replica.fetch.backoff.ms=1000 
          --override replica.fetch.max.bytes=1048576 
          --override replica.fetch.response.max.bytes=10485760 
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: kafkadatadir
          mountPath: /var/lib/kafka
        lifecycle:
          postStart:
            exec:
              command: ["/bin/sh","-c","touch /tmp/health"]
        livenessProbe:
          exec:
            command: ["test","-e","/tmp/health"]
          initialDelaySeconds: 5
          timeoutSeconds: 5
          periodSeconds: 10
        readinessProbe:
          tcpSocket:
            port: client2
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20
  volumeClaimTemplates:
  - metadata:
      name: kafkadatadir
      annotations:
        volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi

kafka3.yaml 文件内容:

代码语言:javascript复制
[root@k8s-master01 kafka]# vim kafka3.yaml
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: kafka-hs3
  labels:
    app: kafka3
spec:
  ports:
  - port: 1099
    name: jmx3
  clusterIP: None
  selector:
    app: kafka3
---
apiVersion: v1
kind: Service
metadata:
  namespace: wiseco
  name: kafka-cs3
  labels:
    app: kafka3
spec:
  type: NodePort
  ports:
  - port: 9092
    targetPort: 9092
    name: client3
    nodePort: 32094
  selector:
    app: kafka3
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: wiseco
  name: kafoka3
spec:
  serviceName: kafka-hs3
  replicas: 1
  selector:
    matchLabels:
      app: kafka3
  template:
    metadata:
      labels:
        app: kafka3
    spec:
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0
        ports:
        - containerPort: 9092
          name: client3
        - containerPort: 1099
          name: jmx3
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=2 
          --override advertised.listeners=PLAINTEXT://156.56.46.37:32094 
          --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 
          --override log.dirs=/var/lib/kafka 
          --override auto.create.topics.enable=true 
          --override auto.leader.rebalance.enable=true 
          --override background.threads=10 
          --override compression.type=producer 
          --override delete.topic.enable=true 
          --override leader.imbalance.check.interval.seconds=300 
          --override leader.imbalance.per.broker.percentage=10 
          --override log.flush.interval.messages=9223372036854775807 
          --override log.flush.offset.checkpoint.interval.ms=60000 
          --override log.flush.scheduler.interval.ms=9223372036854775807 
          --override log.retention.bytes=-1 
          --override log.retention.hours=168 
          --override log.roll.hours=168 
          --override log.roll.jitter.hours=0 
          --override log.segment.bytes=1073741824 
          --override log.segment.delete.delay.ms=60000 
          --override message.max.bytes=1000012 
          --override min.insync.replicas=1 
          --override num.io.threads=8 
          --override num.network.threads=3 
          --override num.recovery.threads.per.data.dir=1 
          --override num.replica.fetchers=1 
          --override offset.metadata.max.bytes=4096 
          --override offsets.commit.required.acks=-1 
          --override offsets.commit.timeout.ms=5000 
          --override offsets.load.buffer.size=5242880 
          --override offsets.retention.check.interval.ms=600000 
          --override offsets.retention.minutes=1440 
          --override offsets.topic.compression.codec=0 
          --override offsets.topic.num.partitions=50 
          --override offsets.topic.replication.factor=3 
          --override offsets.topic.segment.bytes=104857600 
          --override queued.max.requests=500 
          --override quota.consumer.default=9223372036854775807 
          --override quota.producer.default=9223372036854775807 
          --override replica.fetch.min.bytes=1 
          --override replica.fetch.wait.max.ms=500 
          --override replica.high.watermark.checkpoint.interval.ms=5000 
          --override replica.lag.time.max.ms=10000 
          --override replica.socket.receive.buffer.bytes=65536 
          --override replica.socket.timeout.ms=30000 
          --override request.timeout.ms=30000 
          --override socket.receive.buffer.bytes=102400 
          --override socket.request.max.bytes=104857600 
          --override socket.send.buffer.bytes=102400 
          --override unclean.leader.election.enable=true 
          --override zookeeper.session.timeout.ms=6000 
          --override zookeeper.set.acl=false 
          --override broker.id.generation.enable=true 
          --override connections.max.idle.ms=600000 
          --override controlled.shutdown.enable=true 
          --override controlled.shutdown.max.retries=3 
          --override controlled.shutdown.retry.backoff.ms=5000 
          --override controller.socket.timeout.ms=30000 
          --override default.replication.factor=1 
          --override fetch.purgatory.purge.interval.requests=1000 
          --override group.max.session.timeout.ms=300000 
          --override group.min.session.timeout.ms=6000 
          --override log.cleaner.backoff.ms=15000 
          --override log.cleaner.dedupe.buffer.size=134217728 
          --override log.cleaner.delete.retention.ms=86400000 
          --override log.cleaner.enable=true 
          --override log.cleaner.io.buffer.load.factor=0.9 
          --override log.cleaner.io.buffer.size=524288 
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 
          --override log.cleaner.min.cleanable.ratio=0.5 
          --override log.cleaner.min.compaction.lag.ms=0 
          --override log.cleaner.threads=1 
          --override log.cleanup.policy=delete 
          --override log.index.interval.bytes=4096 
          --override log.index.size.max.bytes=10485760 
          --override log.message.timestamp.difference.max.ms=9223372036854775807 
          --override log.message.timestamp.type=CreateTime 
          --override log.preallocate=false 
          --override log.retention.check.interval.ms=300000 
          --override max.connections.per.ip=2147483647 
          --override num.partitions=1 
          --override producer.purgatory.purge.interval.requests=1000 
          --override replica.fetch.backoff.ms=1000 
          --override replica.fetch.max.bytes=1048576 
          --override replica.fetch.response.max.bytes=10485760 
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: kafkadatadir
          mountPath: /var/lib/kafka
        lifecycle:
          postStart:
            exec:
              command: ["/bin/sh","-c","touch /tmp/health"]
        livenessProbe:
          exec:
            command: ["test","-e","/tmp/health"]
          initialDelaySeconds: 5
          timeoutSeconds: 5
          periodSeconds: 10
        readinessProbe:
          tcpSocket:
            port: client3
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20
  volumeClaimTemplates:
  - metadata:
      name: kafkadatadir
      annotations:
        volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi

创建并查看

代码语言:javascript复制
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka1.yaml
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka2.yaml
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka3.yaml

[root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka
kafoka1-0                                      1/1     Running   0          5d21h
kafoka2-0                                      1/1     Running   0          5d21h
kafoka3-0                                      1/1     Running   0          5d21h

[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka
kafka-cs1                   NodePort    10.254.66.1      <none>        9092:32092/TCP                   5d21h
kafka-cs2                   NodePort    10.254.177.227   <none>        9092:32093/TCP                   5d21h
kafka-cs3                   NodePort    10.254.136.242   <none>        9092:32094/TCP                   5d21h
kafka-hs1                   ClusterIP   None             <none>        1099/TCP                         5d21h
kafka-hs2                   ClusterIP   None             <none>        1099/TCP                         5d21h
kafka-hs3                   ClusterIP   None             <none>        1099/TCP                         5d21h

[root@k8s-vm01 new_kafka]# kubectl get statefulset -n wiseco|grep kafoka
kafoka1   1/1     5d21h
kafoka2   1/1     5d21h
kafoka3   1/1     5d21h

查看kafka集群的PV、PVC

代码语言:javascript复制
[root@k8s-vm01 new_kafka]# kubectl get pv -n wiseco|grep kafoka
pvc-17aeea13-4f2a-4f10-8f63-67888c468831   10Gi       RWX            Delete           Bound    wiseco/kafkadatadir-kafoka1-0   kafka-nfs-storage            6d15h
pvc-447d344d-2eaa-4db5-baf8-473ce9811378   10Gi       RWX            Delete           Bound    wiseco/kafkadatadir-kafoka2-0   kafka-nfs-storage            6d15h
pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b   10Gi       RWX            Delete           Bound    wiseco/kafkadatadir-kafoka3-0   kafka-nfs-storage            6d15h

[root@k8s-vm01 new_kafka]# kubectl get pvc -n wiseco|grep kafoka
kafkadatadir-kafoka1-0   Bound    pvc-17aeea13-4f2a-4f10-8f63-67888c468831   10Gi       RWX            kafka-nfs-storage   6d15h
kafkadatadir-kafoka2-0   Bound    pvc-447d344d-2eaa-4db5-baf8-473ce9811378   10Gi       RWX            kafka-nfs-storage   6d15h
kafkadatadir-kafoka3-0   Bound    pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b   10Gi       RWX            kafka-nfs-storage   6d15h

查看NFS服务器上,kafka集群的持久化存储

代码语言:javascript复制
[root@k8s_storage ~]# ll /data/storage/kafka/
total 12
drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831
drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378
drwxrwxrwx 52 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b

[root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831/meta.properties
#
#Tue Jan 12 17:16:03 CST 2021
cluster.id=8H9vYTT_RBOiwUKqFRvn0w
version=0
broker.id=0

[root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378/meta.properties
#
#Tue Jan 12 17:17:11 CST 2021
cluster.id=8H9vYTT_RBOiwUKqFRvn0w
version=0
broker.id=1

[root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b/meta.properties
#
#Tue Jan 12 17:17:29 CST 2021
cluster.id=8H9vYTT_RBOiwUKqFRvn0w
version=0
broker.id=2

3)验证kafka集群数据的生产和消费

代码语言:javascript复制
[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka
kafka-cs1                   NodePort    10.254.66.1      <none>        9092:32092/TCP                   5d21h
kafka-cs2                   NodePort    10.254.177.227   <none>        9092:32093/TCP                   5d21h
kafka-cs3                   NodePort    10.254.136.242   <none>        9092:32094/TCP                   5d21h
kafka-hs1                   ClusterIP   None             <none>        1099/TCP                         5d21h
kafka-hs2                   ClusterIP   None             <none>        1099/TCP                         5d21h
kafka-hs3                   ClusterIP   None             <none>        1099/TCP                         5d21h

[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep zk
zk-cs                       NodePort    10.254.209.162   <none>        2181:32181/TCP                   6d16h
zk-hs                       ClusterIP   None             <none>        2888/TCP,3888/TCP                6d16h

登录到三个kafka集群中的任意一个kafka的pod容器实例(比如kafka2节点),进行kafka数据生产:

代码语言:javascript复制
[root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka
kafoka1-0                                      1/1     Running   0          5d21h
kafoka2-0                                      1/1     Running   0          5d21h
kafoka3-0                                      1/1     Running   0          5d21h

[root@k8s-vm01 new_kafka]# kubectl exec -ti kafoka2-0 -n wiseco -- /bin/bash
[root@kafoka2-0 opt]#

查看topics
[root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181

创建topics,比如名称为wisecotest
[root@kafoka2-0 opt]# kafka-topics.sh --create --topic wisecotest --zookeeper zk-cs:2181 --partitions 1 --replication-factor 1
Created topic wisecotest.

查看topics
[root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181
wisecotest

往上面创建的名称为wisecotest的topics里生产数据
[root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest
>test1
>test2
>test3
>

再登录到另外的一个kafka的pod实例,比如kafka3,验证kafka的数据消费

代码语言:javascript复制
发现上面生产的kafka数据,已经可以消费了
[root@k8s-vm01 ~]# kubectl exec -ti kafoka3-0 -n wiseco -- /bin/bash
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3

再在kafka2的容器里生产数据
[root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest
>test1
>test2
>test3
>hahahaha
>heiheihei
>

再到kafka3的容器里查看数据消费
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei

使用一个外网地址也可以消费
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei

使用kafka内网地址,在K8S集群内部也可以正常消费kafka数据
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei

[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs2:9092 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei

下面告警表示:在当前kafoka3的pod内容使用自己的service地址访问不可达!如果在别的非kafka实例pod内容使用下面地址就没有这个告警了!
因为K8S规定:在Pod内容不可使用Pod自己的service地址往自己连接!
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9093 --topic wisecotest --from-beginning
[2021-01-18 14:47:33,820] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Connection to node -3 (kafka-cs3/10.254.136.242:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-18 14:47:33,821] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Bootstrap broker kafka-cs3:9093 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
test1
test2
test3
hahahaha
heiheihei

现在验证在K8S集群外部,通过外网地址连接和消费Kafka数据

登录到安装kafka客户端的一台外部服务器上:

代码语言:javascript复制
[root@dev-env ~]# cd /usr/kafka/kafka_2.11-2.1.0/bin/
[root@dev-env bin]# 

如下发现,在K8S集群外部可以通过外网地址连接和消费内部生产的Kafka数据了
[root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei


在kafka的pod内部再生产数据
[root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest
>test1
>test2
>test3
>hahahaha
>heiheihei
>123456789
>abcdefghijk
>

在外部查看,可以正常消费这些kafka数据
[root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
123456789
abcdefghijk

kafka集群在内部的连接地址:

kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9092

kafka集群在外部的连接地址:

156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094

0 人点赞