在《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》一文中,我们将字数统计结果输出到终端。本文将模拟生产环境,将结果输出到Mysql数据库。
Mysql配置
假定本机已安装好Mysql Server和Client。
配置用户和密码
通过下面的配置,我们可以让Flink通过该用户名和密码访问Mysql数据库。
代码语言:javascript复制sudo mysql -u root
use mysql
CREATE USER 'admin'@'localhost' IDENTIFIED BY 'pwd123';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'localhost' WITH GRANT OPTION;
FLUSH PRIVILEGES;
quit
创建数据库和表
这个表只有两个字段,一个是用于表示字符的word,一个是其个数count。 需要注意的是,我们并没有设置主键。
代码语言:javascript复制create database words_count_db;
use words_count_db;
CREATE TABLE WordsCountTable (
word varchar(255) NOT NULL,
count BIGINT
);
PyFlink配置
因为我们要使用JDBC连接Mysql,于是需要引入相关的包
代码语言:javascript复制cd /home/fangliang/pyflink-test/.env/lib/python3.10/site-packages/pyflink/lib
下载jdbc和mysql-connector包
代码语言:javascript复制wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.6/flink-connector-jdbc_2.12-1.14.6.jar .
wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.9-rc/mysql-connector-java-8.0.9-rc.jar .
Sink
相较于《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中输出到终端的Sink,我们只需要修改器with字段的连接器即可。
代码语言:javascript复制 my_sink_ddl = """
CREATE TABLE WordsCountTableSink (
`word` STRING,
`count` BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',
'table-name' = 'WordsCountTable',
'driver'='com.mysql.jdbc.Driver',
'username'='admin',
'password'='pwd123'
);
"""
words_count_db是之前在Mysql中创建的数据库名;WordsCountTable是Mysql中表名;其他字段比较好理解,就不解释了。
完整代码
代码语言:javascript复制# sql.py
import argparse
import logging
import sys
from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)
def word_count(input_path):
config = Configuration()
# write all the data to one file
config.set_string('parallelism.default', '1')
env_settings = EnvironmentSettings
.new_instance()
.in_batch_mode()
.with_configuration(config)
.build()
t_env = TableEnvironment.create(env_settings)
# define the source
my_source_ddl = """
create table source (
word STRING
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format(input_path)
t_env.execute_sql(my_source_ddl).print()
tab = t_env.from_path('source')
# define the sink
my_sink_ddl = """
CREATE TABLE WordsCountTableSink (
`word` STRING,
`count` BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',
'table-name' = 'WordsCountTable',
'driver'='com.mysql.jdbc.Driver',
'username'='admin',
'password'='pwd123'
);
"""
t_env.execute_sql(my_sink_ddl).print()
# execute insert
my_select_ddl = """
insert into WordsCountTableSink
select word, count(1) as `count`
from source
group by word
"""
t_env.execute_sql(my_select_ddl).wait()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to process.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
word_count(known_args.input)
执行命令是
代码语言:javascript复制python sql.py --input input1.csv
Using Any for unsupported type: typing.Sequence[~T] No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with
method=DIRECT_READ
. OK OK
我们在Mysql Client端查询结果数据如下
代码语言:javascript复制select * from WordsCountTable;
代码语言:javascript复制 ------ -------
| word | count |
------ -------
| A | 3 |
| B | 1 |
| C | 2 |
| D | 2 |
| E | 1 |
------ -------
5 rows in set (0.00 sec)
附上input1.csv
代码语言:javascript复制"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",