Flink 实践教程:入门(2):写入 Elasticsearch

2021-11-01 10:10:18 浏览数 (1)

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。

前置准备
创建 流计算 Oceanus 集群

进入流计算 Oceanus 控制台(https://console.cloud.tencent.com/oceanus/overview),点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档创建独享集群(https://cloud.tencent.com/document/product/849/48298)。

创建 Elasticsearch 集群

进入Elasticsearch 控制台(https://console.cloud.tencent.com/es),点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问创建 Elasticsearch 集群(https://cloud.tencent.com/document/product/845/19536)

!创建流计算 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。

流计算 Oceanus 作业
1. 创建 Source
代码语言:javascript复制
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',  -- 每秒产生的数据条数
      
  'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
  'fields.f_sequence.start'='1',         -- 序列的起始值
  'fields.f_sequence.end'='10000',       -- 序列的终止值
      
  'fields.f_random.kind'='random',       -- 无界的随机数
  'fields.f_random.min'='1',             -- 随机数的最小值
  'fields.f_random.max'='1000',          -- 随机数的最大值
      
  'fields.f_random_str.length'='10'      -- 随机字符串的长度
);
2. 创建 Sink
代码语言:javascript复制
-- Elasticsearch 只能作为数据目的表(Sink)写入
-- 注意! 如果您启用了 Elasticsearch 的用户名密码鉴权功能, 目前只能使用 Flink 1.10 的旧语法。若无需鉴权, 则可以使用 Flink 1.11 的新语法。
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE Student (
    `user_id`   INT,
    `user_name` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch

    'connector.version' = '6',            -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致
    'connector.hosts' = 'http://10.0.0.175:9200',  -- Elasticsearch 的连接地址
    'connector.index' = 'Student',        -- Elasticsearch 的 Index 名
    'connector.document-type' = 'stu',    -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',     -- 可选参数: 请替换为实际 Elasticsearch 用户名
    'connector.password' = 'xxxxxxxxxx',  -- 可选参数: 请替换为实际 Elasticsearch 密码

    'update-mode' = 'append',             -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',      -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected',   -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)

    'connector.flush-on-checkpoint' = 'true',   -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
    'connector.bulk-flush.max-actions' = '42',  -- 可选参数, 每批次最多的条数
    'connector.bulk-flush.max-size' = '42 mb',  -- 可选参数, 每批次的累计最大大小 (只支持 mb)
    'connector.bulk-flush.interval' = '60000',  -- 可选参数, 批量写入的间隔 (ms)
    'connector.connection-max-retry-timeout' = '300',     -- 每次请求的最大超时时间 (ms)

    'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);
3. 编写业务 SQL
代码语言:javascript复制
INSERT INTO Student
SELECT
f_sequence   AS user_id,
f_random_str AS user_name
FROM random_source;
4. 选择 Connector

点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6,点击【保存】>【发布草稿】运行作业。

?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。

5. 数据查询

进入Elasticsearch 控制台(https://console.cloud.tencent.com/es),点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考通过 Kibana 访问集群(https://cloud.tencent.com/document/product/845/19541)

总结

本示例用 Datagen 连接器随机生成数据,经过 流计算 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

0 人点赞