通过kafka/flink加载MySQL表数据消费 快速安装配置

2021-04-29 20:42:16 浏览数 (1)

说明:对于数据迁移工具来说,好多封装了kafka和flink的,出于好奇,个人试着去下载了一下kafka和flink试着部署一下,本次就简单的记录一下安装过程,踩坑也比较多。部分比较典型的错误,后续整理完也分享一下。本次操作都是单机,单实例,单节点,不涉及分布式或者集群配置。

本文共分为3个阶段:

一、mysql安装部分

二、kafka安装和配置

三、kafka的消费和测试

四、flink通过sql-client客户端加载读取mysql表

==========软件版本:

操作系统:Centos7.4

1、mysql-5.7.22-linux-glibc2.12-x86_64

链接:https://pan.baidu.com/s/1KlR-rRoHC97aX2j2ha05ng

提取码:ksi9

=======================

2、apache-zookeeper-3.5.6-bin.tar.gz

链接:https://pan.baidu.com/s/1zOSeOK_ZiPmNzP8EuwwTBA

提取码:k1is

=====================

3、confluentinc-kafka-connect-jdbc-10.1.1.zip

链接:https://pan.baidu.com/s/1jTOUiXNdNOBQnTiuuiDcOA

提取码:spwr

====================

4、kafka_2.11-2.4.0.tgz

链接:https://pan.baidu.com/s/1u3Q_4F1nQSFWj7qG6ESDZA

提取码:x2oy

=================

5、flink-1.12.2-bin-scala_2.11.tgz

链接:https://pan.baidu.com/s/1tPhpAmLlEhbeV8y-hNnb_A

提取码:qswm

===========java版本和使用的jar包:

mysql-connector-java-8.0.23.jar

链接:https://pan.baidu.com/s/1XDQkUMzJm7dRn-L74Ar-PQ

提取码:shfy

===================================

flink-sql-connector-mysql-cdc-1.0.0.jar

链接:https://pan.baidu.com/s/13z5ocJaebmOM71TXKOCnLQ

提取码:2ine

=============================

[root@localhost ~]# java -version

openjdk version "1.8.0_131"

OpenJDK Runtime Environment (build 1.8.0_131-b12)

OpenJDK 64-Bit Server VM (build 25.131-b12, mixed mode)

====================================================================

一、首先MySQL安装和初始化

这里我就快速将主要步骤贴上来,后续统一的软件目录都放置在/usr/local 下

[root@localhost ~]# useradd mysql

[root@localhost ~]# passwd mysql

[root@localhost ~]# tar -xf mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz

[root@localhost ~]# mv mysql-5.7.22-linux-glibc2.12 /usr/local/mysql

[root@localhost ~]# mkdir -p /usr/local/mysql/data

[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R

配置mysql的默认的变量参数文件:

[root@localhost ~]# vim /etc/my.cnf

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

basedir=/usr/local/mysql

datadir=/usr/local/mysql/data

socket=/tmp/mysql.sock

port=3306

server-id=100

log-bin

[root@localhost ~]# chown mysql.mysql /etc/my.cnf

[root@localhost ~]# su - mysql

Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1

[root@localhost ~]# su - mysql

Last login: Thu Apr 29 16:12:57 CST 2021 on pts/1

[mysql@localhost ~]$

[mysql@localhost ~]$ exit

logout

[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R

[root@localhost ~]# mkdir -p /usr/local/mysql/data

[root@localhost ~]#

[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R

[root@localhost ~]#

[root@localhost ~]# vim /etc/my.cnf

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

basedir=/usr/local/mysql

datadir=/usr/local/mysql/data

socket=/tmp/mysql.sock

port=3306

server-id=100

log-bin

[root@localhost ~]# chown mysql.mysql /etc/my.cnf

[root@localhost ~]#

[root@localhost ~]# su - mysql

Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1

[mysql@localhost ~]$

[mysql@localhost ~]$ vim .bash_profile

把mysql的路径信息加进去,方便调用mysql客户端程序

export MYSQL_HOME=/usr/local/mysql

export PATH=$PATH:$MYSQL_HOME/bin

[mysql@localhost ~]$ source .bash_profile

初始化mysql,并启动mysql数据库

[mysql@localhost ~]$ mysqld --initialize --basedir=/usr/local/mysql --datadir=/usr/local/mysql/data --socket=/tmp/mysql.sock --port=3306

2021-04-29T08:44:26.802747Z 0 [Warning] Gtid table is not ready to be used. Table 'mysql.gtid_executed' cannot be opened.

2021-04-29T08:44:26.805748Z 1 [Note] A temporary password is generated for root@localhost: FvfKr?zGg3B9

[mysql@localhost ~]$

[mysql@localhost ~]$ mysqld_safe --defaults-file=/etc/my.cnf &

[1] 4135

[mysql@localhost ~]$ jobs

[1] Running mysqld_safe --defaults-file=/etc/my.cnf &

修改初始root密码,并创建测试库 db1,并创建root远程登录账户,root@‘%’

[mysql@localhost ~]$ mysql -uroot -p'FvfKr?zGg3B9'

mysql: [Warning] Using a password on the command line interface can be insecure.

Welcome to the MySQL monitor. Commands end with ; or g.

Your MySQL connection id is 2

Server version: 5.7.22-log

mysql> alter user root@localhost identified by '123123';

Query OK, 0 rows affected (0.00 sec)

mysql> flush privileges;

Query OK, 0 rows affected (0.00 sec)

mysql> create user root@'%' identified by '123123';

mysql>grant all privileges on *.* to root@'%';

mysql>create database db1; --------用与后边进行kafka消费指定读取的库

mysql>create table db1.t1(id int primary key ,name varchar(20),time1 timestamp default now()); -------kafka要读取,并消费的表

==================

二、kafka快速配置

使用root操作系统账户来配置

首先解压kafka需要使用zookeeper来做broker连接器注册记录的,用做 meta 信息存储,consumer 的消费状态,group 的管理以及 offset的值我们先解压并启动zookeeper。

[root@localhost ~]# tar -xf apache-zookeeper-3.5.6-bin.tar.gz

[root@localhost ~]# mv apache-zookeeper-3.5.6 /usr/local/zookeeper

[root@localhost ~]# cd /usr/local/zookeeper/

[root@localhost zookeeper]# ls

bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt

[root@localhost zookeeper]# cd conf/

[root@localhost conf]# ls

configuration.xsl log4j.properties zoo_sample.cfg

zookeeper默认读取zoo.cfg配置文件,这里默认系统给了一个zoo_sample.cfg文件,我们可以直接cp并重命名一下即可。

这里关于zk 的其他配置,就是用默认值就可以了。

[root@localhost conf]# cp zoo_sample.cfg zoo.cfg

[root@localhost conf]#

接下来是kafka的解压和配置文件配置

[root@localhost ~]# tar -xf kafka_2.11-2.4.0.tgz

[root@localhost ~]# mv kafka_2.11-2.4 /usr/local/kafka

[root@localhost ~]# cd /usr/local/kafka/

[root@localhost kafka]# ls

bin config libs LICENSE NOTICE site-docs

[root@localhost kafka]# cd config/ ----------切换到配置文件目录,找到server开头的的配置文件

[root@localhost config]# ls

connect-console-sink.properties connect-file-sink.properties connect-mirror-maker.properties log4j.properties tools-log4j.properties

connect-console-source.properties connect-file-source.properties connect-standalone.properties producer.properties trogdor.conf

connect-distributed.properties connect-log4j.properties consumer.properties server.properties zookeeper.properties

[root@localhost config]#

[root@localhost config]# vim server.properties

配置监听的端口内部和外部的单网卡可以写同一个,或者外部网卡不写

################################################

listeners=PLAINTEXT://192.168.100.10:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured. Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092

advertised.listeners=PLAINTEXT://192.168.100.10:9092

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181 ------------zk的默认的地址和端口

# Timeout in ms for connecting to zookeepe

zookeeper.connection.timeout.ms=6000

解压插件,并配置kafka的conection的配置文件

[root@localhost ~]# unzip confluentinc-kafka-connect-jdbc-10.1.1.zip

[root@localhost ~]# cd /usr/local/kafka/

可以移动到kafka的目录中,并将其命名为connect-jdbc

[root@localhost kafka]# ls

bin config connect-jdbc libs LICENSE logs NOTICE site-docs

[root@localhost~]# cd /usr/local/kafka/config

修改单机连接的配置文件

[root@localhost config]# vim connect-standalone.properties

修改内容如下:

# These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=192.168.100.10:9092

# Examples:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

#要加载的插件所在地目录,这里把kafka的lib和刚移动过的插件lib在这里声明一下

plugin.path=/usr/local/kafka/lib,/usr/local/kafka/connect-jdbc/lib

[root@localhost lib]# pwd

/usr/local/kafka/connect-jdbc/lib

[root@localhost lib]#

[root@localhost lib]# 这个插件目录中有kafka调用链接mysql 的jdbc的jar接口驱动

[root@localhost lib]# ls

checker-qual-3.5.0.jar mssql-jdbc-8.4.1.jre8.jar oraclepki-19.7.0.0.jar postgresql-42.2.19.jar ucp-19.7.0.0.ja

common-utils-6.0.0.jar ojdbc8-19.7.0.0.jar orai18n-19.7.0.0.jar simplefan-19.7.0.0.jar xdb-19.7.0.0.ja

jtds-1.3.1.jar ojdbc8-production-19.7.0.0.pom osdt_cert-19.7.0.0.jar slf4j-api-1.7.30.jar xmlparserv2-19.7.0.0.ja

kafka-connect-jdbc-10.1.1.jar ons-19.7.0.0.jar osdt_core-19.7.0.0.jar sqlite-jdbc-3.25.2.ja

[root@localhost lib]#

加下来配置连接器的参数文件

[root@localhost etc]# pwd

/usr/local/kafka/connect-jdbc/etc

[root@localhost etc]# ls

sink-quickstart-sqlite.properties source-quickstart-sqlite.properties

[root@localhost etc]# cp source-quickstart-sqlite.properties /usr/local/kafka/config/mysql..properties

[root@localhost config]# vim mysql.properties

配置内容如下:

# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.

#connection.url=jdbc:sqlite:test.db

connection.url=jdbc:mysql://192.168.100.10:3306/db1?user=root&password=123123 # --注意格式db1? ,用户名和密码中间使用 &符号,

mode=incrementing

incrementing.column.name=id --增量参照列

topic.prefix=mysql- --kafka的topic命名将以此开头

table.whitelist=t1,t2,t3 --将要加载读取的mysql数据库中的表的白名单

# Define when identifiers should be quoted in DDL and DML statements.

==================

三、kafka的消费和测试

接下来启动zookeeper 和kafka服务

启动服务之前,我们可以先配置root下各个组件的路径变量,方便我们调用命令。生产不建议对root超管去做变量配置,一旦路径配置错误,会影响全局系统。

[root@localhost ~]# vim .bash_profile

export ZK_HOME=/usr/local/zookeepe

export KAFKA_HOME=/usr/local/kafka

export FLINK_HOME=/usr/local/flink ------flink这个预先配置,后边会用到

export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin:$FLINK_HOME/bin

[root@localhost ~]#

[root@localhost ~]# source .bash_profile

启动zookeeper服务

[root@localhost ~]# zkServer.sh start

/usr/bin/java

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg ----默认会加载zoo.cfg配置文件

Starting zookeeper ... STARTED

[root@localhost ~]# ps -ef|grep zookeeper #--查看服务进程

启动kafka服务

[root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &

[1] 19309

[root@localhost ~]#

[2021-04-29 19:49:54,740] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2021-04-29 19:49:55,351] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)

[2021-04-29 19:49:55,352] INFO starting (kafka.server.KafkaServer)

[root@localhost ~]# jobs

[1] Running kafka-server-start.sh /usr/local/kafka/config/server.properties &

[root@localhost ~]#

三 、开始测试数据加载和消费

使用kafka的kafka-topic.sh尝试创建一个测试topic, 可以先测试服务是否可用 ,具体方式如下:

[root@localhost ~]# kafka-topics.sh --create --zookeeper 192.168.100.10:2181 --replication-factor 1 --partitions 3 --topic test-topic

Created topic test-topic.

查看创建的topic列表

[root@localhost ~]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181

test-topic

[root@localhost ~]# 启动kafka到mysql的连接器

确认是否能都加载到mysql中db1库中的t1表

[root@localhost ~]#

[root@localhost ~]# connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties /usr/local/kafka/config/mysql.properties &

如果能够成功加载mysql的db1.t1表,则可能会看到如下提示

[2021-04-29 19:59:19,874] INFO WorkerSourceTask{id=source-mysql-jdbc-autoincrement-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)

[2021-04-29 19:59:19,877] INFO Begin using SQL query: SELECT * FROM `db1`.`t1` WHERE `db1`.`t1`.`id` > ? ORDER BY `db1`.`t1`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:164)

读取kafka加载的mysql表数据

接下来启动消费端,来消费kafka已经从mysql生产端加载的数据,先查看已经加载到的topic信息

[root@localhost config]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181

__consumer_offsets

mysql-t1 ---这个就是在mysql.properties参数中定义的topic.prefix=mysql- 的kafka命名topic的前缀

test-topic

==========================

在mysql中插入如下数据:

mysql> select * from db1.t1;

mysql表t1数据mysql表t1数据

[root@localhost config]# kafka-console-consumer.sh --bootstrap-server 192.168.100.10:9092 --topic mysql-t1 --from-beginning

kafka消费出的json格式数据kafka消费出的json格式数据

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":10,"name":"test-kafka-consumer","time1":1619719214000}} --timestap 这里转换为时间戳值

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":20,"name":"","time1":1619721861000}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":30,"name":"xxxxxxxxxxxx","time1":1619721869000}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":40,"name":"AAAAAAAAAAA","time1":1619721920000}}

四、使用flink来加载mysql的数据

flink这里我们只需要解压,然后调用flink-sql-connector-mysql-cdc-1.0.0.jar驱动捕获mysql的binlog的变化,来动态刷新数据变化。

如下:

[root@localhost flink]# tar -xf flink-1.12.2-bin-scala_2.11.tgz

[root@localhost ~]# mv flink-1.12.2-bin /usr/local/flink

[root@localhost ~]#

[root@localhost ~]# start-cluster.sh ----直接启动flink服务

Starting cluster.

Starting standalonesession daemon on host localhost.

Starting taskexecutor daemon on host localhost.

[root@localhost ~]#

[root@localhost ~]# sql-client.sh embedded ---调用flink的sql客户端

No default environment specified.

Searching for '/usr/local/flink/conf/sql-client-defaults.yaml'...found.

Reading default environment from: file:/usr/local/flink/conf/sql-client-defaults.yaml

No session environment specified.

Command history file path: /root/.flink-sql-history

Flink SQL> CREATE TABLE flink_tab( id int primary key, name string,time1 string) --mysql库中的t1的列名必须匹配

> WITH (

> 'connector' = 'mysql-cdc', -- 连接器

> 'hostname' = '192.168.100.10', --mysql地址

> 'port' = '3306', -- mysql端口

> 'username' = 'root', --mysql用户名

> 'password' = '123123', -- mysql密码

> 'database-name' = 'db1', -- 数据库名称

> 'table-name' = 't1' --数数据库表名

> );

[INFO] Table has been created.

Flink SQL>

Flink SQL> select *from flink_tab; -------查看

flink客户端加载映射的mysql中t1表flink客户端加载映射的mysql中t1表

其中遇到的错误将在后续更新,用到的软件包这里打包分享出来,大家感兴趣可以试着摸索一下,原理可以根据官网和网上的分享理解。

flink官网,关于sql-clent的配置:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html

0 人点赞