Flink SQL Gateway提交任务到flink on k8s集群

2023-10-18 14:48:02 浏览数 (2)

Flink SQL Gateway是一项允许多个客户端从远程并发执行 SQL 的服务。 它提供了一种简单的方法来提交 Flink 作业、查找元数据并在线分析数据。在Flink 1.16版本,官方即将SQL Gateway其合入Flink主线。可正式通过Flink官方包安装与启动Flink SQL Gateway。本为以Flink 1.17版本为例,介绍一种在K8s中启动Flink SQL Gateway,并连接到通过Flink kubernetes operator启动的Flink Session集群上。

当然,在开始一切之前,你需要一些必要的环境:

  1. 一个K8s集群
  2. K8s集群安装有kubectl、helm3工具
  3. k8s集群可以访问docker hub/有一个自建docker仓库,可以将必要的镜像推送到此仓库

基于上述环境,接下来将从如下方面讲述本文内容:

  1. 部署Flink Kubernetes Operator
  2. 使用Flink Kubernetes Operator部署Flink session集群
  3. 使用Flink镜像部署Flink SQL Gateway Deployment,并连接到Flink session集群
  4. Flink SQL Gateway基本使用

部署Flink Kubernetes Operator

详细资料可参考:https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/,下面简要介绍相关部署步骤:

代码语言:javascript复制
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.5.0/

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set webhook.create=false

查看相关pod以及helm项目可以看到部署成功:

部署Flink Session集群

基于Flink Kubernetes Operator,要部署Flink Session集群还是相对简单的,根据官网介绍,只是不用定义job即可:

下面是一个创建Flink Session集群的例子:

代码语言:javascript复制
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: default
  name: basic-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  mode: standalone
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink-service-account
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1

将上述文件保存为flink-sessioncluster.yaml,执行如下命令开始创建:

代码语言:javascript复制
kubectl apply -f flink-sessioncluster.yaml

创建完,即可得到flink session集群:

可以看到该集群的headless service为:basic-example-rest,通过连接至该service的8081端口即可连接到该Flink session集群。

创建Flink SQL Gateway Deployment

有了上述Flink Session集群的连接端口,那么即可通过如下描述文件启动Flink SQL Gateway Deployment

代码语言:javascript复制
---
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    jkube.io/scm-tag: HEAD
    jkube.io/git-branch: master
  labels:
    app: flink-sql-gateway
    provider: jkube
    version: 0.0.1
    group: com.iwhalecloud.wdp
  name: flink-sql-gateway
spec:
  replicas: 1
  revisionHistoryLimit: 1
  selector:
    matchLabels:
      app: flink-sql-gateway
      provider: jkube
      group: com.zh.ch.bigdata
  template:
    metadata:
      annotations:
        jkube.io/scm-tag: HEAD
        jkube.io/git-branch: master
      labels:
        app: flink-sql-gateway
        provider: jkube
        version: 0.0.1
        group: com.zh.ch.bigdata
    spec:
      containers:
        - name: flink-sql-gateway
          env:
            - name: KUBERNETES_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
            - name: HOSTNAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
          image: flink:1.17
          command:
          - /bin/bash
          - -c  
          args:
          - /opt/flink/bin/sql-gateway.sh start-foreground -Drest.address=basic-example-rest -Dsql-gateway.endpoint.rest.address=localhost
          imagePullPolicy: Always
          ports:
            - containerPort: 8083
              name: http-1
              protocol: TCP
          securityContext:
            privileged: false

将上述文件保存为flink-sql-gateway-deployment.yaml,执行如下命令启动

代码语言:javascript复制
kubectl apply -f flink-sql-gateway-deployment.yaml

得到:

查看日志:

现在创建一个NodePort,将该Deployment的8083端口映射到主机端口:

代码语言:javascript复制
apiVersion: v1
kind: Service
metadata:
  labels:
    expose: "true"
    app: flink-sql-gateway
  name: flink-sql-gateway
  namespace: default
spec:
  type: NodePort
  ports:
    - name: http1
      port: 8083
      protocol: TCP
      nodePort: 30083
  selector:
    app: flink-sql-gateway

将如上内容保存为flink-sql-gateway-service.yaml,执行如下命令创建

代码语言:javascript复制
kubectl apply -f flink-sql-gateway-service.yaml

得到:

Flink SQL Gateway基本使用

接下来便可通过30083主机端口连接到Flink SQL Gateway,以下是使用Postman的例子:

  1. 获取Flink版本信息
  1. 打开一个Session
  1. 记下上述sessionHandle并添加到查询SQL的URL中,如下所示,该SQL为"SELECT 1":

查看flink sql gateway日志:

查看Flink session集群日志:

  1. 记下上述operationHandle,还可以查询上述SQL的执行结果:

当然,上面只是一个简单的示例,更多Flink SQL Gateway的用法可参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql-gateway/overview/

与此同时,还可以将Flink SQL Gateway Deployment封装为Helm项目,简化部署。

0 0 投票数

文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2345731

0 人点赞