java整合datax 全网最详细的教程

2022-08-05 20:15:45 浏览数 (1)

目录:

  • 一、去官网下载datax
  • 二、依赖
  • 三、测试类
  • 四、json传参

Part2今日主题:java整合datax

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。 由于官网的例子是基于python的例子,网上也很少可以找到java版本的,然后自己刚好做过,记录一下,我搜了一下,我应该是全网第一篇写的datax最详细的文章。

如果对人工智能感兴趣的可以去这个网站看看,受益匪浅点击跳转

1一、去官网下载

https://github.com/alibaba/Data

点击下载就好了

2二、依赖

下载的压缩文件解压,在lib目录下将这两个依赖安装到本地

将这个两个依赖安装到本地maven仓库

在项目引入这两个依赖

代码语言:javascript复制
       <dependency>
            <groupId>com.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>0.0.1</version>
        </dependency>

同时也需要引入下面这几个依赖,否则会报错

代码语言:javascript复制
 <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

在resource目录下新建一个datax目录,在datax目录下新建test.json文件。

test.json:

代码语言:javascript复制
{
  "job": {
    "setting": {
      "speed": {
        "channel": 4
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],
                "querySql": ["select t.id,t.name,t.status from users t"]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "writeMode": "insert",
            "column": ["id","name","status"],
            "connection": [
              {
                "table": [
                  "temp_users"
                ],
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test"
              }
            ]
          }
        }
      }
    ]
  }
}

我这是自己本地的mysql数据库进行数据同步的测试

3三、测试类

代码语言:javascript复制
public class TestMain {
    public static String getCurrentClasspath(){
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        String currentClasspath = classLoader.getResource("").getPath();
        // 当前操作系统
        String osName = System.getProperty("os.name");
        if (osName.startsWith("Win")) {
            // 删除path中最前面的/
            currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);
        }
        return currentClasspath;
    }
    public static void main(String[] args) {

        System.setProperty("datax.home","D:\datax\datax");
        String[] datxArgs2 = {"-job", getCurrentClasspath() "/datax/test.json", "-mode", "standalone", "-jobid", "-1"};
        try {
            Engine.entry(datxArgs2);
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
}

运行结果:

数据同步成功。

4四、json传参

相信大家在做数据同步的时候,肯定不是简单的sql,一般还有条件的,也就是参数,那参数要怎么传进去呢?

test.json: 改成一个接收参数的方式

我是将id为多少的数据同步过去select t.id,t.name,t.status from users t where t.id=${id}

代码语言:javascript复制
{
  "job": {
    "setting": {
      "speed": {
        "channel": 4
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],
                "querySql": ["select t.id,t.name,t.status from users t where t.id=${id}"]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "writeMode": "insert",
            "column": ["id","name","status"],
            "connection": [
              {
                "table": [
                  "temp_users"
                ],
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test"
              }
            ]
          }
        }
      }
    ]
  }
}

测试类就应该这么写:

参数值已经成功的注入进来了

0 人点赞