Flink 实践教程:入门2-写入 Elasticsearch

2021-12-08 16:05:27 浏览数 (3)

Oceanus简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。

通过 Flink 生成数据写入到 Elasticsearch

前置准备

创建 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群。

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群。

创建 Elasticsearch 集群

进入 Elasticsearch 控制台,点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问 创建 Elasticsearch 集群

!创建 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。

Oceanus 作业

1. 创建 Source
代码语言:txt复制
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',  -- 每秒产生的数据条数
  
  'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
  'fields.f_sequence.start'='1',         -- 序列的起始值
  'fields.f_sequence.end'='10000',       -- 序列的终止值
  
  'fields.f_random.kind'='random',       -- 无界的随机数
  'fields.f_random.min'='1',             -- 随机数的最小值
  'fields.f_random.max'='1000',          -- 随机数的最大值
      
  'fields.f_random_str.length'='10'      -- 随机字符串的长度
);
2. 创建 Sink
代码语言:txt复制
-- Elasticsearch 只能作为数据目的表(Sink)写入
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE Student (
    `user_id`   INT,
    `user_name` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch

    'connector.version' = '6',            -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致
    'connector.hosts' = 'http://10.0.0.175:9200',  -- Elasticsearch 的连接地址
    'connector.index' = 'Student',        -- Elasticsearch 的 Index 名
    'connector.document-type' = 'stu',    -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',     -- 可选参数: 请替换为实际 Elasticsearch 用户名
    'connector.password' = 'xxxxxxxxxx',  -- 可选参数: 请替换为实际 Elasticsearch 密码

    'update-mode' = 'append',             -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',      -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected',   -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)
    'connector.connection-max-retry-timeout' = '300',     -- 每次请求的最大超时时间 (ms)

    'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);
3. 编写业务 SQL
代码语言:txt复制
INSERT INTO Student
SELECT
    f_sequence   AS user_id,
    f_random_str AS user_name
FROM random_source;
4. 选择 Connector

点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6,点击【保存】>【发布草稿】运行作业。

?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。

5. 数据查询

进入 Elasticsearch 控制台,点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考 通过 Kibana 访问集群

总结

本示例用 Datagen 连接器随机生成数据,经过 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。

0 人点赞