Flink本地模式安装和使用

2022-10-05 17:02:19 浏览数 (1)

Flink官方主页:https://flink.apache.org/ 。

安装部署

不同版本的Flink对JDK版本要求不尽相同,需要根据具体的Flink版本要求先安装好JDK环境。 通常,在Flink的Release Notes中有对应JDK版本的说明,如:Release Notes for Flink 1.15 ,要求安装JDK11。 下载JDK 并进行安装配置。

从国内镜像下载指定版本的安装包,如下以下载并安装flink-1.15.2为例进行说明。

代码语言:javascript复制
$ tar -xzf flink-1.15.2-bin-scala_2.12.tgz
$ cd flink-1.15.2-bin-scala_2.12

启动/停止服务

如下操作均是在Flink安装目录下执行。 启动本地模式集群:

代码语言:javascript复制
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host xxx.
Starting taskexecutor daemon on host xxx.

停止本地模式集群:

代码语言:javascript复制
$ ./bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 3900) on host xxx.
Stopping standalonesession daemon (pid: 3613) on host xxx.

Flink操作

如下操作均在Flink自带的SQL客户端中执行。 启动SQL客户端:

代码语言:javascript复制
./bin/sql-client.sh 

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ | |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ | |/ /  ___ | |  | | |      | |    | | |/ _  '_ | __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|_ |_____/ __________|  _____|_|_|___|_| |_|__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /home/zhangsan/.flink-sql-history

Flink SQL> 

注意:在Flink SQL客户端中执行SQL语句时都必须以分号(;)结束。

定义Source表

以从Kafka中消费数据为例:

代码语言:javascript复制
CREATE TABLE UserBehaviorKafkaSource (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
)

定义Sink表

以将数据写入MySQL为例:

代码语言:javascript复制
CREATE TABLE UserBehaviorMySQLSink (
  `id` BIGINT,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/user_behavior',
   'table-name' = 'user_behavior'
   'password' = 'user_behavior'
)

定义转换SQL

代码语言:javascript复制
insert into UserBehaviorMySQLSink select user_id,item_id,behavior from UserBehaviorKafkaSource

在Flink SQL客户端中执行上述转换SQL成功之后,通过Flink Web管理后台即可查看相应任务信息。

详细信息参见官方文档,以Flink v1.15为例子,文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/ 。

写在最后

通常来讲,在安装Flink本地集群模式学习时,除了需要安装Flink本身以外,需要同时安装Kafka和MySQL作为数据输入源和数据输出目的地。

0 人点赞