Flink 实践教程:入门(11):MongoDB Sink

2022-05-20 16:08:02 浏览数 (2)

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将您详细介绍如何将数据写入 MongoDB。

视频内容

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群。

创建 MongoDB 实例

进入 MongoDB 控制台,点击左上角【新建实例】创建实例,具体参考 创建 MongoDB 实例。作者这里使用 shell 的方式,下载 MongoDB 客户端的方式连接数据库,更多连接信息请参考 连接 MongoDB 实例

代码语言:shell复制
## 安装 MongoDB 客户端
wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-XX.XX.XX.tgz
## 解压
tar zxvf mongodb-linux-x86_64-rhel70-XX.XX.XX.tgz
## 进入目录
cd mongodb-linux-x86_64-rhel70-XX.XX.XX
## 连接客户端
./bin/mongo -umongouser -p***** 172.xx.xx.xx:27017/admin

请注意选择与云数据库 MongoDB 服务并与 CVM 操作系统相匹配的版本

流计算 Oceanus 作业

1. 创建 Source
代码语言:sql复制
CREATE TABLE datagen_source_table ( 
    id INT, 
    name STRING 
) WITH ( 
    'connector' = 'datagen',
    'rows-per-second'='1'  -- 每秒产生的数据条数
);
2. 创建 Sink
代码语言:sql复制
CREATE TABLE mongodb (
    id INT,
    name STRING
) WITH (
  'connector' = 'mongodb',   -- 固定值 'mongodb'
  'database' = 'testdb',     --数据库名
  'collection' = 'test1',    --数据集合
  'uri' = 'mongodb://mongouser:******@xx.xx.xx.xx:27017/admin', -- MongoDB连接串
  'batchSize' = '5'          -- 每次批量写入的条数
);
3. 编写业务 SQL
代码语言:sql复制
INSERT INTO mongodb
SELECT * from datagen_source_table;

总结

本实例演示如何使用 Datagen 生成随机数据,然后使用 MongoDB Sink 连接器将数据写入 MongoDB。

目前仅 Flink 1.13 支持 Sink 端写入,其他版本暂不支持。undefinedMongoDB Sink 暂不支持 Upsert。undefinedMongoDB 的 User 必须拥有 database 的写权限。

0 人点赞