Flink官方主页:https://flink.apache.org/ 。
安装部署
不同版本的Flink对JDK版本要求不尽相同,需要根据具体的Flink版本要求先安装好JDK环境。
通常,在Flink的Release Notes
中有对应JDK版本的说明,如:Release Notes for Flink 1.15 ,要求安装JDK11。
下载JDK 并进行安装配置。
从国内镜像下载指定版本的安装包,如下以下载并安装flink-1.15.2为例进行说明。
代码语言:javascript复制$ tar -xzf flink-1.15.2-bin-scala_2.12.tgz
$ cd flink-1.15.2-bin-scala_2.12
启动/停止服务
如下操作均是在Flink安装目录下执行。 启动本地模式集群:
代码语言:javascript复制$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host xxx.
Starting taskexecutor daemon on host xxx.
停止本地模式集群:
代码语言:javascript复制$ ./bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 3900) on host xxx.
Stopping standalonesession daemon (pid: 3613) on host xxx.
Flink操作
如下操作均在Flink自带的SQL客户端中执行。 启动SQL客户端:
代码语言:javascript复制./bin/sql-client.sh
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ | | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ | |/ / ___ | | | | | | | | | |/ _ '_ | __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|_ |_____/ __________| _____|_|_|___|_| |_|__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/zhangsan/.flink-sql-history
Flink SQL>
注意:在Flink SQL客户端中执行SQL语句时都必须以分号(;)结束。
定义Source表
以从Kafka中消费数据为例:
代码语言:javascript复制CREATE TABLE UserBehaviorKafkaSource (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
定义Sink表
以将数据写入MySQL为例:
代码语言:javascript复制CREATE TABLE UserBehaviorMySQLSink (
`id` BIGINT,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/user_behavior',
'table-name' = 'user_behavior'
'password' = 'user_behavior'
)
定义转换SQL
代码语言:javascript复制insert into UserBehaviorMySQLSink select user_id,item_id,behavior from UserBehaviorKafkaSource
在Flink SQL客户端中执行上述转换SQL成功之后,通过Flink Web管理后台即可查看相应任务信息。
详细信息参见官方文档,以Flink v1.15为例子,文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/ 。
写在最后
通常来讲,在安装Flink本地集群模式学习时,除了需要安装Flink本身以外,需要同时安装Kafka和MySQL作为数据输入源和数据输出目的地。