Springboot 整合 sqoop

2021-12-12 10:08:58 浏览数 (1)

Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。 —来自:百度百科 以上是对sqoop的一个简单说明,具体我就不再多赘述。日常企业开发过程中,我们可能面对增删改查的业务比较多,但是作为一个程序员,我觉得不要局限于此,可能面对业务的场景不同。自然而然的对整个业务技术框架的认知也是有一定的局限性。今天跟大家分享这个Sqoop框架,基于springBoot进行整合。也许能够帮助你在你的简历中锦上添花,希望能够你带来薪资上的变化。 说起sqoop,我们必须要了解它的用途,主要应用于 RDBMS 与 Hadoop ( HDFS / Hive / HBase )数据传输迁移。我们主要通过这个工具主要作为归档数据同步使用辅助企业智能推荐及可视化大屏使用。为什么会用到sqoop,因为它解决了关系数据库与Hadoop之间的数据传输问题。基于它底层MR的本质,具有性能高、易用、灵活的特点。

下面通过实际的案例,对整个整合过程做进一步的详解。

1.首先我们先创建一个简单的springboot工程。具体过程我就不多赘述,可参考以下地址:springboot超详细搭建过程 - 简书

2.引入依赖文件

代码语言:javascript复制
 <dependencies>
    <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.sqoop/sqoop -->
        <dependency>
            <groupId>org.apache.sqoop</groupId>
            <artifactId>sqoop</artifactId>
            <version>1.4.7</version>
            <classifier>hadoop260</classifier>
        </dependency>


        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
        <!--hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.8.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-mapred</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-common</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
    </dependencies>

注意:

3.创建Crotoller类

代码语言:javascript复制
package com.ienglish.batch.data.sqoop.controller;

/**
 * @program: english-batch-data-sqoop
 *
 * @description:
 *
 * @author: ShiY.WANG
 *
 * @create: 2021-10-15 16:53
 **/

import com.ienglish.batch.data.sqoop.entity.sqoopBean;
import com.ienglish.batch.data.sqoop.service.SqoopService;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/data")
public class SqoopController {
    @Autowired
    private SqoopService sqoopService;

    /**
     * MYSQL数据到HDFS
     * @param connect  jdbc:mysql://127.0.0.1:3306/test
     * @param driver com.mysql.cj.jdbc.Driver
     * @param username /
     * @param password /
     * @param table  /
     * @param m  MR的并行度
     * @param targetdir   hdfs目录
     * @param hdfsAddr  hdfs地址
     * @return
     * @throws Exception
     */
    @PostMapping("/mysqlTohdfs")
    @ResponseBody
    public sqoopBean mysqlTohdfs(String connect, String driver, String username, String password, String table, int m, String targetdir, String hdfsAddr) throws Exception {
        return sqoopService.mysqlTohdfs(connect, driver, username, password, table, m, targetdir, hdfsAddr);
    }

    /**
     *
     * @param jdbc
     * @param driver
     * @param username
     * @param password
     * @param mysqlTable
     * @param hbaseTableName
     * @param columnFamily
     * @param rowkey
     * @param m
     * @return
     * @throws Exception
     */
    @PostMapping("/mysql2hbase")
    @ResponseBody
    public sqoopBean transformMysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception {
        return sqoopService.mysql2Hbase(jdbc, driver, username, password, mysqlTable, hbaseTableName, columnFamily, rowkey, m);
    }


}

3.创建接口类

代码语言:javascript复制
package com.ienglish.batch.data.sqoop.service;/**
 * @description:
 * Author: wsy
 * Date: 2021/10/15 16:56
 * Content:
 */

import com.ienglish.batch.data.sqoop.entity.sqoopBean;

/**
 * @program: english-batch-data-sqoop
 *
 * @description:
 *
 * @author: ShiY.WANG
 *
 * @create: 2021-10-15 16:56
 **/


public interface SqoopService {
    /**
     * mysql到hdfs
     * @param connect
     * @param driver
     * @param username
     * @param password
     * @param table
     * @param m
     * @param targetdir
     * @param hdfsAddr
     * @return
     * @throws Exception
     */
    public sqoopBean mysqlTohdfs(String connect, String driver, String username, String password, String table, int m, String targetdir, String hdfsAddr) throws Exception;

    /**
     * mysql到hbase
     * @param jdbc
     * @param driver
     * @param username
     * @param password
     * @param mysqlTable
     * @param hbaseTableName
     * @param columnFamily
     * @param rowkey
     * @param m
     * @return
     * @throws Exception
     */
    public sqoopBean mysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception;


}

4.创建实现类

代码语言:javascript复制
package com.ienglish.batch.data.sqoop.service.serviceimpl;/**
 * @description:
 * Author: wsy
 * Date: 2021/10/15 16:57
 * Content:
 */

/**
 * @program: english-batch-data-sqoop
 *
 * @description:
 *
 * @author: ShiY.WANG
 *
 * @create: 2021-10-15 16:57
 **/

import com.ienglish.batch.data.sqoop.entity.sqoopBean;
import com.ienglish.batch.data.sqoop.service.SqoopService;
import org.apache.hadoop.conf.Configuration;
import org.springframework.stereotype.Service;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
import java.sql.Timestamp;
import java.util.Date;

@Service
public class SqoopServiceImpl implements SqoopService {

    @Override
    public sqoopBean mysqlTohdfs(String connect, String driver, String username,
                                 String password, String table, int m, String targetdir,
                                 String hdfsAddr) throws Exception {
        String[] args = new String[]{
                "--connect", connect,
                "--driver", driver,
                "-username", username,
                "-password", password,
                "--table", table,
                "-m", String.valueOf(m),
                "--target-dir", targetdir,
        };
        sqoopBean sqoopBean = new sqoopBean();
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        conf.set("fs.default.name", hdfsAddr);
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        sqoopBean.setI(Sqoop.runSqoop(sqoop, expandArguments));
        sqoopBean.setTs(new Timestamp(new Date().getTime()));
        return sqoopBean;


    }

    @Override
    public sqoopBean mysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception {
        String[] args = new String[]{
                "--connect", jdbc,
                "--driver", driver,
                "-username", username,
                "-password", password,
                "--table", mysqlTable,
                "--hbase-table", hbaseTableName,
                "--column-family", columnFamily,
                "--hbase-create-table",
                "--hbase-row-key", rowkey,
                "-m", String.valueOf(m),
        };
        sqoopBean sqoopBean = new sqoopBean();
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        sqoopBean.setI(Sqoop.runSqoop(sqoop, expandArguments));
        sqoopBean.setTs(new Timestamp(new Date().getTime()));
        return sqoopBean;
    }
}
代码语言:javascript复制
以上就是正常的整合配置过程,这只是一个demo,可以根据实际需求进行封装使用。

具体执行结果如下:

文件传输成功。

0 人点赞