本文会描述如下几部分的数据同步
- mysql2mysql
- mysql2hive
flinkx的版本1.12-SNAPSHOT
1.拉取代码
git clone https://github.com/DTStack/flinkx.git
2.编译
mvn clean package -DskipTests=true
注:这里需要提前运行sh install_jars.sh脚本
另在执行如下命令
代码语言:javascript复制mvn install:install-file -DgroupId=com.dm -DartifactId=Dm7JdbcDriver18 -Dversion=7.6.0.197 -Dpackaging=jar -Dfile=Dm7JdbcDriver18.jar
3.运行
注:这里要先删除掉lib目录下面的所有jar,不然会出现如下错误
代码语言:javascript复制错误: 找不到或无法加载主类 .Users.wangkai.apps.src.github.flinkx.lib.flinkx-launcher-1.6.jar
json模式
local模式测试
1.命令
代码语言:javascript复制bin/flinkx -mode local
-jobType sync
-job /Users/wangkai/apps/install/flinkx/mysql2mysql.json
-flinkxDistDir flinkx-dist
2.mysql2mysql.json
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "birthday",
"type": "TIMESTAMP"
},
{
"name": "ts",
"type": "TIMESTAMP"
}
],
"customSql": "",
"where": "id < 1000",
"splitPk": "id",
"increColumn": "id",
"startLocation": "2",
"polling": true,
"pollingInterval": 3000,
"queryTimeOut": 1000,
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/test?useSSL=false"
],
"table": [
"users"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/test?useSSL=false",
"table": [
"test_users2"
]
}
],
"writeMode": "insert",
"flushIntervalMills":"3000",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "birthday",
"type": "TIMESTAMP"
},
{
"name": "ts",
"type": "TIMESTAMP"
}
]
}
}
}
],
"setting": {
"restore": {
"restoreColumnName": "id"
},
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
per-job模式
1.命令
代码语言:javascript复制bin/flinkx
-mode yarn-per-job
-jobType sync
-job /Users/wangkai/apps/install/flinkx/mysql2mysql.json
-flinkxDistDir flinkx-dist
-flinkConfDir /Users/wangkai/apps/install/flink-1.12.1/conf
-flinkLibDir /Users/wangkai/apps/install/flink-1.12.1/lib
2.mysql2mysql.json
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "birthday",
"type": "TIMESTAMP"
},
{
"name": "ts",
"type": "TIMESTAMP"
}
],
"customSql": "",
"where": "id < 1000",
"splitPk": "id",
"increColumn": "id",
"startLocation": "2",
"polling": true,
"pollingInterval": 3000,
"queryTimeOut": 1000,
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/test?useSSL=false"
],
"table": [
"users"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/test?useSSL=false",
"table": [
"test_users3"
]
}
],
"writeMode": "insert",
"flushIntervalMills":"3000",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "birthday",
"type": "TIMESTAMP"
},
{
"name": "ts",
"type": "TIMESTAMP"
}
]
}
}
}
],
"setting": {
"restore": {
"restoreColumnName": "id"
},
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
这里会有些许错误:
1.hadoop相关的错误
代码语言:javascript复制]
2021-11-09 14:57:37,102 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - Starting the SlotManager.
2021-11-09 14:57:37,109 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 2 ms
2021-11-09 14:57:37,111 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Exception on heartbeat
java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.getAMRMToken()Lorg/apache/hadoop/yarn/api/records/Token;
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:307) ~[hadoop-yarn-client-2.7.5.jar:?]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:237) [hadoop-yarn-client-2.7.5.jar:?]
2021-11-09 14:57:37,113 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Interrupted while waiting for queue
java.lang.InterruptedException: null
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_291]
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_291]
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) ~[?:1.8.0_291]
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) [hadoop-yarn-client-2.7.5.jar:?]
2021-11-09 14:57:37,115 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Stopping callback due to:
java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.getAMRMToken()Lorg/apache/hadoop/yarn/api/records/Token;
代码语言:javascript复制org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newBuilder()Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest$AllocateRequestBuilder;
需要修改flinkx依赖的hadoop的代码,统一hadoop version
2.flink相关的错误
代码语言:javascript复制Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not deserialize inputs
at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:275) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:281) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.wrapOperatorIntoOutput(OperatorChain.java:671) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:617) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:549) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:170) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:509) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = 3698633776553163849, local class serialVersionUID = -3137689219135046939
flink版本要统一
mysql2hive
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "birthday",
"type": "TIMESTAMP"
},
{
"name": "ts",
"type": "TIMESTAMP"
}
],
"customSql": "",
"where": "id < 1000",
"splitPk": "",
"queryTimeOut": 1000,
"username": "root",
"password": "root",
"requestAccumulatorInterval": 2,
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/test?useSSL=false"
],
"table": [
"users"
]
}
]
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://localhost:10000/kudu",
"username" : "wangkai",
"password" : "wangkai",
"fileType" : "text",
"writeMode" : "overwrite",
"compress" : "",
"schema" : "kudu",
"charsetName" : "UTF-8",
"maxFileSize" : 1073741824,
"tablesColumn" : "{"demonstrate_users": [{"key": "id","type": "BIGINT"}, {"key": "name","type": "string"}, {"key": "birthday","type": "TIMESTAMP"},{"key": "ts","type": "TIMESTAMP"}]}",
"defaultFS" : "hdfs://localhost:9000"
}
}
}
],
"setting": {
"restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
},
"errorLimit": {
"record": 0,
"percentage": 0
},
"speed": {
"bytes": 1048576,
"channel": 1
}
}
}
}
命令
代码语言:javascript复制bin/flinkx
-mode yarn-per-job
-jobType sync
-job /Users/wangkai/apps/install/flinkx/mysql2mysql.json
-flinkxDistDir flinkx-dist
-flinkConfDir /Users/wangkai/apps/install/flink-1.12.1/conf
-flinkLibDir /Users/wangkai/apps/install/flink-1.12.1/lib