基于streamx闲聊flink在k8s上的实践

2022-02-23 14:45:35 浏览数 (1)

  • 自定义构造镜像
  • 使用streamx开发flink任务
    • 自定义docker namespace
    • 基本的配置
    • flink sql的编写
    • k8s 模板的配置
    • 其他参数
  • 查看日志
  • 后续

自定义构造镜像

由于容器化易管理、易扩容等优点,越来越多的组件都开始迁移到容器上,k8s作为容器化的事实标准,受到了越来越多的人的青睐,由于我们目前很多web开发的组件也是部署到k8s上的,为了后续运维更加方便,我把我们用到的一些大数据组件都迁移到了k8s,包括hive、trino、flink、clickhouse等等。

我们要做的第一步就是针对这些我们的需求构建docker镜像,hdfs是作为公共的存储,是直接部署到物理机的,没有部署到k8s。

对于最主要的flink镜像,由于我们的source和sink 比较固定,为了方便,我这里是把我们常用的jar包都集成了到了flink的镜像里 ,包括mysql、iceberg、hive、clickhouse。

因为考虑到会引用一些公共的配置,比如trino会查询iceberg,flink需要写入iceberg,这个时候需要用到hive和hdfs的配置文件,所以我决定把这些配置文件都放到一个公共的地方,所以建立了对应的k8s configmap,以便能统一管理和使用。

命令如下:

代码语言:javascript复制

kubectl create configmap hive-configmap -n flink-cluster  --from-file=hive-site.xml

kubectl create configmap core-configmap -n flink-cluster  --from-file=core-site.xml

kubectl create configmap hdfs-configmap -n flink-cluster  --from-file=hdfs-site.xml

使用streamx开发flink任务

为了更加方便的管理flink任务,我们引入了开源的streamx( https://www.streamxhub.com/ )来进行flink任务的开发和管理。flink任务目前采用的是k8s application方式来运行,这个是为了更好的任务隔离性。

对于streamx上flink k8s模式的部署,建议大家先看下官方文档( https://www.streamxhub.com/zh-CN/docs/flink-k8s/k8s-dev )基本的概念和操作,文档中有非常详细的说明,我这里不在赘述。

flink在k8s native方式部署的一些操作,我之前写过一个文章,大家可以看看 Flink 1.13 在Native k8s的部署实践

我这里主要介绍下在实践过程中可能会踩到的一些坑,希望对大家有所帮助。

自定义docker namespace

我这里是用公司内部的docker环境,所以对docker操作的权限有一些限制,我这里配置了我自己的docker namespace,主要是在$STREAMX_HOME/conf/applicaton.yml下添加streamx.docker.register.image-namespace的配置

比如

代码语言:javascript复制

streamx.docker.register.image-namespace: my-namespace

更多的系统级别的参数参考 https://www.streamxhub.com/zh-CN/docs/flink-k8s/k8s-dev#相关参数配置

基本的配置

上面是一个streamx开发flink sql任务的截图,这些参数大家一看就应该懂了,我这里稍微强调一下Kubernetes ClusterId , 这个参数可以带中划线但是不能带下划线,此外我这里测试的时候,如果包含大写字母也是不行的。

flink sql的编写

我这里主要讲下flink sql的编写,因为sql任务已经能满足绝大部分的功能需求了,我们这里主要是消费kafka,然后写入iceberg和clickhouse.

代码语言:javascript复制

CREATE CATALOG iceberg WITH (
  'type' = 'iceberg',
  'catalog-type' = 'hive',
  'uri' = 'thrift://hive-service.flink-cluster.svc.cluster.local:9083',
  'warehouse' = 'hdfs://mycluser/user/hive3/warehouse'
);

CREATE CATALOG hive WITH (
  'type' = 'hive',
  'default-database' = 'common',
  'hive-version' = '3.1.2',
  'hive-uris' = 'thrift://hive-service.flink-cluster.svc.cluster.local:9083',
  'hive-warehouse' = 'hdfs://mycluser/user/hive3/warehouse'
);


iceberg的catalog没有什么疑问,直接用官方的就行。

我这里使用了hive的catalog,主要是因为我把clickhouse的表信息存储到了hive里面,使用的时候引用一下就行了,这样做是为了避免每次都新建clickhouse的表信息,因为我这里有一个需求是多个任务写到同一个clickhouse表。

此外,做一个说明,这个hive catalog 是我做了修改的,flink的官方并没有提供hive-uris和hive-warehouse的方式来构建hive catalog,之所以这么改,是因为flink的官方用来构建hive catalog的时候需要提供给一个本地的hive conf dir,但是application 模式没法提供这个文件夹,对于这种情况,flink提供了去classpath查找hive-site.xml的方法,但是我这里调试了一下,没有调通(可能是我比较笨),所以我就参考iceberg的,自己做了一个通过hive-uris和hive-warehouse来构建hive catalog的方法。

具体 的flink任务的sql示例如下:

代码语言:javascript复制

create table kafka_source (
  ..............
  .............
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxxxxx',
  'properties.bootstrap.servers' = 'kafka-service.k8s-namespace.svc.cluster.local:9092',
  'properties.group.id' = 'test_group',
  'json.ignore-parse-errors' = 'true',
  'json.timestamp-format.standard' = 'ISO-8601',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);


insert into iceberg.db.iceberg_table select * from kafka_source;


insert into hive.db.clickhouse_table select * from kafka_source;

k8s 模板的配置

在这里我们自定义了一些k8s的配置文件,大概的模板如下,然后我做一下简单说明。

代码语言:javascript复制

apiVersion: v1
kind: Pod
metadata:
  name: pod-template
spec:
  hostAliases:
    - ip: "192.168.0.1"
      hostnames:
      - "node1"
    - ip: "192.168.0.2"
      hostnames:
      - "node2"
    - ip: "192.168.0.3"
      hostnames:
      - "node3"
  containers:
    - name: flink-main-container
      image: myflink:1.0
      volumeMounts:
      - name: config-core
        mountPath: /opt/flink/hadoopconf/core-site.xml
        subPath: core-site.xml
      - name: config-hdfs
        mountPath: /opt/flink/hadoopconf/hdfs-site.xml
        subPath: hdfs-site.xml
      - name: config-hive
        mountPath: /opt/flink/hadoopconf/hive-site.xml
        subPath: hive-site.xml
      - name: host-time
        mountPath: /etc/localtime
        readOnly: true
  volumes:
    - name: host-time
      hostPath:
        path: /etc/localtime
    - name: config-core
      configMap:
        name: core-configmap
        items:
          - key: core-site.xml
            path: core-site.xml
    - name: config-hdfs
      configMap:
        name: hdfs-configmap
        items:
          - key: hdfs-site.xml
            path: hdfs-site.xml
    - name: config-hive
      configMap:
        name: hive-configmap
        items:
          - key: hive-site.xml
            path: hive-site.xml
  • hostAliases :这个主要是想要在容器里访问部署在物理机上的hdfs,但是hdfs-site.xml里面配置的地址是host,我们需要配置相应的hosts和ip的映射,所以使用hostAliases来修改flink容器的hosts配置。
  • 首段我们提到了我把公共的配置文件都放到了k8s的configmap里,所以这里就是让flink引用了hdfs-site.xml等配置文件.
  • 通过挂载宿主机的/etc/localtime来修改容器的时区。

其他参数

在这个动态参数配置里,我添加了两个额外的配置

  • -Dkubernetes.jobmanager.service-account=flink 是在k8s这个namespace下面操作的账号
  • -Dkubernetes.container.image.pull-policy=Always :streamx提交k8s任务的流程是先在本地基于你提供的基础镜像和你当前任务的一些配置,生成一个新的镜像,然后推送到docker仓库,然后启动flink任务,添加这个参数是为了防止你修改了程序,但是由于未及时更新镜像,而部署了旧的程序。

查看日志

对于flink来说,还有一个非常重要的功能就是查看日志,对于部署到yarn的flink来说,如果flink程序挂了,我们可以去yarn上看历史日志,但是对于k8s来说,如果程序挂了,那么k8s的pod就消失了,没法查日志了。所以我们需要把k8s上容器的日志给持久化,以便出问题的时候能够排查。

我们这里使用Grafana Loki Promtail构建了轻量级的日志采集系统,用户可以在Grafana上查看对应的k8s容器的日志,包括已经挂掉的flink程序和其他k8s 容器的日志。

一个简单的查看日志的示例如下:

后续

目前我简单介绍了我们的大数据架构中关于flink在k8s部署的一些踩坑和实践,希望对大家有所帮助,后续有时间再介绍一些其他的组件的实践和整体的架构。

比如

  • 其他的大数据组件的k8s实践,比如hive、trino等在k8s的部署。
  • 基于apache ranger做细粒度的权限控制,ranger只提供了presto 的集成,通过修改ranger的源码可以适配我们的trino版本,做到行级别和列级别的权限控制。
  • 鉴于我们整体上使用了flink来做我们的大数据任务计算,为了更小的学习成本,我们尝试使用阿里的alink来做机器学习相关的尝试,目前我已经通过修改iceberg和alink的源码打通了alink和iceberg的读写功能,可以使用alink读取iceberg的数据,进行机器学习训练,然后将结果写入iceberg,并同时可以使用python和java开发。

0 人点赞