说明:对于数据迁移工具来说,好多封装了kafka和flink的,出于好奇,个人试着去下载了一下kafka和flink试着部署一下,本次就简单的记录一下安装过程,踩坑也比较多。部分比较典型的错误,后续整理完也分享一下。本次操作都是单机,单实例,单节点,不涉及分布式或者集群配置。
本文共分为3个阶段:
一、mysql安装部分
二、kafka安装和配置
三、kafka的消费和测试
四、flink通过sql-client客户端加载读取mysql表
==========软件版本:
操作系统:Centos7.4
1、mysql-5.7.22-linux-glibc2.12-x86_64
链接:https://pan.baidu.com/s/1KlR-rRoHC97aX2j2ha05ng
提取码:ksi9
=======================
2、apache-zookeeper-3.5.6-bin.tar.gz
链接:https://pan.baidu.com/s/1zOSeOK_ZiPmNzP8EuwwTBA
提取码:k1is
=====================
3、confluentinc-kafka-connect-jdbc-10.1.1.zip
链接:https://pan.baidu.com/s/1jTOUiXNdNOBQnTiuuiDcOA
提取码:spwr
====================
4、kafka_2.11-2.4.0.tgz
链接:https://pan.baidu.com/s/1u3Q_4F1nQSFWj7qG6ESDZA
提取码:x2oy
=================
5、flink-1.12.2-bin-scala_2.11.tgz
链接:https://pan.baidu.com/s/1tPhpAmLlEhbeV8y-hNnb_A
提取码:qswm
===========java版本和使用的jar包:
mysql-connector-java-8.0.23.jar
链接:https://pan.baidu.com/s/1XDQkUMzJm7dRn-L74Ar-PQ
提取码:shfy
===================================
flink-sql-connector-mysql-cdc-1.0.0.jar
链接:https://pan.baidu.com/s/13z5ocJaebmOM71TXKOCnLQ
提取码:2ine
=============================
[root@localhost ~]# java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-b12)
OpenJDK 64-Bit Server VM (build 25.131-b12, mixed mode)
====================================================================
一、首先MySQL安装和初始化
这里我就快速将主要步骤贴上来,后续统一的软件目录都放置在/usr/local 下
[root@localhost ~]# useradd mysql
[root@localhost ~]# passwd mysql
[root@localhost ~]# tar -xf mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz
[root@localhost ~]# mv mysql-5.7.22-linux-glibc2.12 /usr/local/mysql
[root@localhost ~]# mkdir -p /usr/local/mysql/data
[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R
配置mysql的默认的变量参数文件:
[root@localhost ~]# vim /etc/my.cnf
[root@localhost ~]# cat /etc/my.cnf
[mysqld]
basedir=/usr/local/mysql
datadir=/usr/local/mysql/data
socket=/tmp/mysql.sock
port=3306
server-id=100
log-bin
[root@localhost ~]# chown mysql.mysql /etc/my.cnf
[root@localhost ~]# su - mysql
Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1
[root@localhost ~]# su - mysql
Last login: Thu Apr 29 16:12:57 CST 2021 on pts/1
[mysql@localhost ~]$
[mysql@localhost ~]$ exit
logout
[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R
[root@localhost ~]# mkdir -p /usr/local/mysql/data
[root@localhost ~]#
[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R
[root@localhost ~]#
[root@localhost ~]# vim /etc/my.cnf
[root@localhost ~]# cat /etc/my.cnf
[mysqld]
basedir=/usr/local/mysql
datadir=/usr/local/mysql/data
socket=/tmp/mysql.sock
port=3306
server-id=100
log-bin
[root@localhost ~]# chown mysql.mysql /etc/my.cnf
[root@localhost ~]#
[root@localhost ~]# su - mysql
Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1
[mysql@localhost ~]$
[mysql@localhost ~]$ vim .bash_profile
把mysql的路径信息加进去,方便调用mysql客户端程序
export MYSQL_HOME=/usr/local/mysql
export PATH=$PATH:$MYSQL_HOME/bin
[mysql@localhost ~]$ source .bash_profile
初始化mysql,并启动mysql数据库
[mysql@localhost ~]$ mysqld --initialize --basedir=/usr/local/mysql --datadir=/usr/local/mysql/data --socket=/tmp/mysql.sock --port=3306
2021-04-29T08:44:26.802747Z 0 [Warning] Gtid table is not ready to be used. Table 'mysql.gtid_executed' cannot be opened.
2021-04-29T08:44:26.805748Z 1 [Note] A temporary password is generated for root@localhost: FvfKr?zGg3B9
[mysql@localhost ~]$
[mysql@localhost ~]$ mysqld_safe --defaults-file=/etc/my.cnf &
[1] 4135
[mysql@localhost ~]$ jobs
[1] Running mysqld_safe --defaults-file=/etc/my.cnf &
修改初始root密码,并创建测试库 db1,并创建root远程登录账户,root@‘%’
[mysql@localhost ~]$ mysql -uroot -p'FvfKr?zGg3B9'
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or g.
Your MySQL connection id is 2
Server version: 5.7.22-log
mysql> alter user root@localhost identified by '123123';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
mysql> create user root@'%' identified by '123123';
mysql>grant all privileges on *.* to root@'%';
mysql>create database db1; --------用与后边进行kafka消费指定读取的库
mysql>create table db1.t1(id int primary key ,name varchar(20),time1 timestamp default now()); -------kafka要读取,并消费的表
==================
二、kafka快速配置
使用root操作系统账户来配置
首先解压kafka需要使用zookeeper来做broker连接器注册记录的,用做 meta 信息存储,consumer 的消费状态,group 的管理以及 offset的值我们先解压并启动zookeeper。
[root@localhost ~]# tar -xf apache-zookeeper-3.5.6-bin.tar.gz
[root@localhost ~]# mv apache-zookeeper-3.5.6 /usr/local/zookeeper
[root@localhost ~]# cd /usr/local/zookeeper/
[root@localhost zookeeper]# ls
bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt
[root@localhost zookeeper]# cd conf/
[root@localhost conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
zookeeper默认读取zoo.cfg配置文件,这里默认系统给了一个zoo_sample.cfg文件,我们可以直接cp并重命名一下即可。
这里关于zk 的其他配置,就是用默认值就可以了。
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
[root@localhost conf]#
接下来是kafka的解压和配置文件配置
[root@localhost ~]# tar -xf kafka_2.11-2.4.0.tgz
[root@localhost ~]# mv kafka_2.11-2.4 /usr/local/kafka
[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# ls
bin config libs LICENSE NOTICE site-docs
[root@localhost kafka]# cd config/ ----------切换到配置文件目录,找到server开头的的配置文件
[root@localhost config]# ls
connect-console-sink.properties connect-file-sink.properties connect-mirror-maker.properties log4j.properties tools-log4j.properties
connect-console-source.properties connect-file-source.properties connect-standalone.properties producer.properties trogdor.conf
connect-distributed.properties connect-log4j.properties consumer.properties server.properties zookeeper.properties
[root@localhost config]#
[root@localhost config]# vim server.properties
配置监听的端口内部和外部的单网卡可以写同一个,或者外部网卡不写
################################################
listeners=PLAINTEXT://192.168.100.10:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://192.168.100.10:9092
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181 ------------zk的默认的地址和端口
# Timeout in ms for connecting to zookeepe
zookeeper.connection.timeout.ms=6000
解压插件,并配置kafka的conection的配置文件
[root@localhost ~]# unzip confluentinc-kafka-connect-jdbc-10.1.1.zip
[root@localhost ~]# cd /usr/local/kafka/
可以移动到kafka的目录中,并将其命名为connect-jdbc
[root@localhost kafka]# ls
bin config connect-jdbc libs LICENSE logs NOTICE site-docs
[root@localhost~]# cd /usr/local/kafka/config
修改单机连接的配置文件
[root@localhost config]# vim connect-standalone.properties
修改内容如下:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=192.168.100.10:9092
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#要加载的插件所在地目录,这里把kafka的lib和刚移动过的插件lib在这里声明一下
plugin.path=/usr/local/kafka/lib,/usr/local/kafka/connect-jdbc/lib
[root@localhost lib]# pwd
/usr/local/kafka/connect-jdbc/lib
[root@localhost lib]#
[root@localhost lib]# 这个插件目录中有kafka调用链接mysql 的jdbc的jar接口驱动
[root@localhost lib]# ls
checker-qual-3.5.0.jar mssql-jdbc-8.4.1.jre8.jar oraclepki-19.7.0.0.jar postgresql-42.2.19.jar ucp-19.7.0.0.ja
common-utils-6.0.0.jar ojdbc8-19.7.0.0.jar orai18n-19.7.0.0.jar simplefan-19.7.0.0.jar xdb-19.7.0.0.ja
jtds-1.3.1.jar ojdbc8-production-19.7.0.0.pom osdt_cert-19.7.0.0.jar slf4j-api-1.7.30.jar xmlparserv2-19.7.0.0.ja
kafka-connect-jdbc-10.1.1.jar ons-19.7.0.0.jar osdt_core-19.7.0.0.jar sqlite-jdbc-3.25.2.ja
[root@localhost lib]#
加下来配置连接器的参数文件
[root@localhost etc]# pwd
/usr/local/kafka/connect-jdbc/etc
[root@localhost etc]# ls
sink-quickstart-sqlite.properties source-quickstart-sqlite.properties
[root@localhost etc]# cp source-quickstart-sqlite.properties /usr/local/kafka/config/mysql..properties
[root@localhost config]# vim mysql.properties
配置内容如下:
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
#connection.url=jdbc:sqlite:test.db
connection.url=jdbc:mysql://192.168.100.10:3306/db1?user=root&password=123123 # --注意格式db1? ,用户名和密码中间使用 &符号,
mode=incrementing
incrementing.column.name=id --增量参照列
topic.prefix=mysql- --kafka的topic命名将以此开头
table.whitelist=t1,t2,t3 --将要加载读取的mysql数据库中的表的白名单
# Define when identifiers should be quoted in DDL and DML statements.
==================
三、kafka的消费和测试
接下来启动zookeeper 和kafka服务
启动服务之前,我们可以先配置root下各个组件的路径变量,方便我们调用命令。生产不建议对root超管去做变量配置,一旦路径配置错误,会影响全局系统。
[root@localhost ~]# vim .bash_profile
export ZK_HOME=/usr/local/zookeepe
export KAFKA_HOME=/usr/local/kafka
export FLINK_HOME=/usr/local/flink ------flink这个预先配置,后边会用到
export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin:$FLINK_HOME/bin
[root@localhost ~]#
[root@localhost ~]# source .bash_profile
启动zookeeper服务
[root@localhost ~]# zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg ----默认会加载zoo.cfg配置文件
Starting zookeeper ... STARTED
[root@localhost ~]# ps -ef|grep zookeeper #--查看服务进程
启动kafka服务
[root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
[1] 19309
[root@localhost ~]#
[2021-04-29 19:49:54,740] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-04-29 19:49:55,351] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2021-04-29 19:49:55,352] INFO starting (kafka.server.KafkaServer)
[root@localhost ~]# jobs
[1] Running kafka-server-start.sh /usr/local/kafka/config/server.properties &
[root@localhost ~]#
三 、开始测试数据加载和消费
使用kafka的kafka-topic.sh尝试创建一个测试topic, 可以先测试服务是否可用 ,具体方式如下:
[root@localhost ~]# kafka-topics.sh --create --zookeeper 192.168.100.10:2181 --replication-factor 1 --partitions 3 --topic test-topic
Created topic test-topic.
查看创建的topic列表
[root@localhost ~]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181
test-topic
[root@localhost ~]# 启动kafka到mysql的连接器
确认是否能都加载到mysql中db1库中的t1表
[root@localhost ~]#
[root@localhost ~]# connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties /usr/local/kafka/config/mysql.properties &
如果能够成功加载mysql的db1.t1表,则可能会看到如下提示
[2021-04-29 19:59:19,874] INFO WorkerSourceTask{id=source-mysql-jdbc-autoincrement-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2021-04-29 19:59:19,877] INFO Begin using SQL query: SELECT * FROM `db1`.`t1` WHERE `db1`.`t1`.`id` > ? ORDER BY `db1`.`t1`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
读取kafka加载的mysql表数据
接下来启动消费端,来消费kafka已经从mysql生产端加载的数据,先查看已经加载到的topic信息
[root@localhost config]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181
__consumer_offsets
mysql-t1 ---这个就是在mysql.properties参数中定义的topic.prefix=mysql- 的kafka命名topic的前缀
test-topic
==========================
在mysql中插入如下数据:
mysql> select * from db1.t1;
[root@localhost config]# kafka-console-consumer.sh --bootstrap-server 192.168.100.10:9092 --topic mysql-t1 --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":10,"name":"test-kafka-consumer","time1":1619719214000}} --timestap 这里转换为时间戳值
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":20,"name":"","time1":1619721861000}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":30,"name":"xxxxxxxxxxxx","time1":1619721869000}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":40,"name":"AAAAAAAAAAA","time1":1619721920000}}
四、使用flink来加载mysql的数据
flink这里我们只需要解压,然后调用flink-sql-connector-mysql-cdc-1.0.0.jar驱动捕获mysql的binlog的变化,来动态刷新数据变化。
如下:
[root@localhost flink]# tar -xf flink-1.12.2-bin-scala_2.11.tgz
[root@localhost ~]# mv flink-1.12.2-bin /usr/local/flink
[root@localhost ~]#
[root@localhost ~]# start-cluster.sh ----直接启动flink服务
Starting cluster.
Starting standalonesession daemon on host localhost.
Starting taskexecutor daemon on host localhost.
[root@localhost ~]#
[root@localhost ~]# sql-client.sh embedded ---调用flink的sql客户端
No default environment specified.
Searching for '/usr/local/flink/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/usr/local/flink/conf/sql-client-defaults.yaml
No session environment specified.
Command history file path: /root/.flink-sql-history
Flink SQL> CREATE TABLE flink_tab( id int primary key, name string,time1 string) --mysql库中的t1的列名必须匹配
> WITH (
> 'connector' = 'mysql-cdc', -- 连接器
> 'hostname' = '192.168.100.10', --mysql地址
> 'port' = '3306', -- mysql端口
> 'username' = 'root', --mysql用户名
> 'password' = '123123', -- mysql密码
> 'database-name' = 'db1', -- 数据库名称
> 'table-name' = 't1' --数数据库表名
> );
[INFO] Table has been created.
Flink SQL>
Flink SQL> select *from flink_tab; -------查看
其中遇到的错误将在后续更新,用到的软件包这里打包分享出来,大家感兴趣可以试着摸索一下,原理可以根据官网和网上的分享理解。
flink官网,关于sql-clent的配置:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html