flinkx数据同步

2021-11-22 10:42:03 浏览数 (1)

本文会描述如下几部分的数据同步

  1. mysql2mysql
  2. mysql2hive

flinkx的版本1.12-SNAPSHOT

1.拉取代码

git clone https://github.com/DTStack/flinkx.git

2.编译

mvn clean package -DskipTests=true

注:这里需要提前运行sh install_jars.sh脚本

另在执行如下命令

代码语言:javascript复制
mvn install:install-file -DgroupId=com.dm -DartifactId=Dm7JdbcDriver18 -Dversion=7.6.0.197 -Dpackaging=jar -Dfile=Dm7JdbcDriver18.jar

3.运行

注:这里要先删除掉lib目录下面的所有jar,不然会出现如下错误

代码语言:javascript复制
错误: 找不到或无法加载主类 .Users.wangkai.apps.src.github.flinkx.lib.flinkx-launcher-1.6.jar

json模式

local模式测试

1.命令

代码语言:javascript复制
bin/flinkx -mode local 
-jobType sync 
-job /Users/wangkai/apps/install/flinkx/mysql2mysql.json 
-flinkxDistDir flinkx-dist

2.mysql2mysql.json

代码语言:javascript复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "increColumn": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?useSSL=false"
                ],
                "table": [
                  "users"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3306/test?useSSL=false",
                "table": [
                  "test_users2"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

per-job模式

1.命令

代码语言:javascript复制
bin/flinkx 
    -mode yarn-per-job 
    -jobType sync 
    -job /Users/wangkai/apps/install/flinkx/mysql2mysql.json 
    -flinkxDistDir flinkx-dist 
    -flinkConfDir /Users/wangkai/apps/install/flink-1.12.1/conf 
    -flinkLibDir /Users/wangkai/apps/install/flink-1.12.1/lib

2.mysql2mysql.json

代码语言:javascript复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "increColumn": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?useSSL=false"
                ],
                "table": [
                  "users"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3306/test?useSSL=false",
                "table": [
                  "test_users3"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

这里会有些许错误:

1.hadoop相关的错误

代码语言:javascript复制
]
2021-11-09 14:57:37,102 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - Starting the SlotManager.
2021-11-09 14:57:37,109 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 2 ms
2021-11-09 14:57:37,111 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Exception on heartbeat
java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.getAMRMToken()Lorg/apache/hadoop/yarn/api/records/Token;
    at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:307) ~[hadoop-yarn-client-2.7.5.jar:?]
    at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:237) [hadoop-yarn-client-2.7.5.jar:?]
2021-11-09 14:57:37,113 INFO  org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Interrupted while waiting for queue
java.lang.InterruptedException: null
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_291]
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_291]
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) ~[?:1.8.0_291]
    at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) [hadoop-yarn-client-2.7.5.jar:?]
2021-11-09 14:57:37,115 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Stopping callback due to:
java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.getAMRMToken()Lorg/apache/hadoop/yarn/api/records/Token;
代码语言:javascript复制
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newBuilder()Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest$AllocateRequestBuilder;

需要修改flinkx依赖的hadoop的代码,统一hadoop version

2.flink相关的错误

代码语言:javascript复制
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not deserialize inputs
    at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:275) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:281) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.wrapOperatorIntoOutput(OperatorChain.java:671) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:617) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:549) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:170) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:509) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = 3698633776553163849, local class serialVersionUID = -3137689219135046939

flink版本要统一

mysql2hive

代码语言:javascript复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "",
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "requestAccumulatorInterval": 2,
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?useSSL=false"
                ],
                "table": [
                  "users"
                ]
              }
            ]
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://localhost:10000/kudu",
            "username" : "wangkai",
            "password" : "wangkai",
            "fileType" : "text",
            "writeMode" : "overwrite",
            "compress" : "",
            "schema" : "kudu",
            "charsetName" : "UTF-8",
            "maxFileSize" : 1073741824,
            "tablesColumn" : "{"demonstrate_users": [{"key": "id","type": "BIGINT"}, {"key": "name","type": "string"}, {"key": "birthday","type": "TIMESTAMP"},{"key": "ts","type": "TIMESTAMP"}]}",
            "defaultFS" : "hdfs://localhost:9000"
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      },
      "speed": {
        "bytes": 1048576,
        "channel": 1
      }
    }
  }
}

命令

代码语言:javascript复制
bin/flinkx 
    -mode yarn-per-job 
    -jobType sync 
    -job /Users/wangkai/apps/install/flinkx/mysql2mysql.json 
    -flinkxDistDir flinkx-dist 
    -flinkConfDir /Users/wangkai/apps/install/flink-1.12.1/conf 
    -flinkLibDir /Users/wangkai/apps/install/flink-1.12.1/lib

0 人点赞