DataX 二次开发之HBase同步到HBase

2021-05-06 15:31:16 浏览数 (1)

需求要从一个HBase把数据同步到另外一个HBase库中,这个需求要怎么用DataX来实现了,首先阅读下官方文档

Reader 插件文档

Hbase11XReader 插件文档

代码语言:javascript复制
{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hbase11xreader",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "xxxf"
                        },
                        "table": "users",
                        "encoding": "utf-8",
                        "mode": "normal",
                        "column": [
                            {
                                "name": "rowkey",
                                "type": "string"
                            },
                            {
                                "name": "info: age",
                                "type": "string"
                            },
                            {
                                "name": "info: birthday",
                                "type": "date",
                                "format":"yyyy-MM-dd"
                            },
                            {
                                "name": "info: company",
                                "type": "string"
                            },
                            {
                                "name": "address: contry",
                                "type": "string"
                            },
                            {
                                "name": "address: province",
                                "type": "string"
                            },
                            {
                                "name": "address: city",
                                "type": "string"
                            }
                        ],
                        "range": {
                            "startRowkey": "",
                            "endRowkey": "",
                            "isBinaryRowkey": true
                        }
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/Users/shf/workplace/datax_test/hbase11xreader/result",
                        "fileName": "qiran",
                        "writeMode": "truncate"
                    }
                }
            }
        ]
    }
}

我们需要组装一个这样的结构让DataX执行,每个属性是什么意思请自行看文档。这里不做过多的讲解

reader

首先我们看下reader,reader就是我们需要把备份数据表数据组装到parameter里面。这里只讲重要的。

*注意

代码语言:javascript复制
 "hbaseConfig": {"hbase.zookeeper.quorum": "xxxf"},

在配置的时候我是把Hbase的地址加入到了/etc/hosts里面

代码语言:javascript复制
5x.8x.1xx.255 data-node01

然后使用的是映射名称

代码语言:javascript复制
 "hbaseConfig": {"hbase.zookeeper.quorum": "data-node01"},
"column": []

这个里面需要读取备份表的数据

简单介绍一下Java 访问Hbase

代码语言:javascript复制
 public void connectionHBase() throws Exception {
        Configuration config = HBaseConfiguration.create();// 配置
        config.set("hbase.zookeeper.quorum", "映射的Hbase地址");// zookeeper地址
        config.set("hbase.zookeeper.property.clientPort","端口号");
        config.set("zookeeper.znode.parent", "parent");
        connection = ConnectionFactory.createConnection(config);
    }

网上一般都没有这个配置,我是访问连接不上咨询Hbase同事,同事告诉我需要加上。所以这个加不加看Hbase的设置

代码语言:javascript复制
 config.set("zookeeper.znode.parent", "parent");
根据表名获取Hbase表的数据
代码语言:javascript复制
public List<ObjectNode> getColumns(String tableName) throws Exception {

        List<ObjectNode> list = new ArrayList<>();
        try {
            Table table= connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            scan.withStartRow(this.startRowKey.getBytes());
            scan.withStopRow(this.endRowKey.getBytes());

            ResultScanner resultScanner = table.getScanner(scan);
            for(Result result: resultScanner){
                List<Cell> cells =  result.listCells();
                for(Cell cell : cells){
                    String family = Bytes.toString(CellUtil.cloneFamily(cell));
                    String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
                    String data = Bytes.toString(CellUtil.cloneValue(cell));

                    ObjectNode objectNode = JSONUtils.createObjectNode();
                    objectNode.put("rowKey", rowKey);
                    objectNode.put("type", "string");
                    objectNode.put("name", family ":" qualifier);
                    objectNode.put("data", data);
                    list.add(objectNode);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

        //关闭连接
        connection.close();

        return list;
    }

获取到表数据后我们就可以组装reader的column

name:就是取ObjectNode的name(这里的name是列簇 列名) type:我也没有找到获取Hbase的数据类型,所以我模式使用string

代码语言:javascript复制
 {
       "name": "rowkey",
       "type": "string"
  },

其他参数组装就很简单这里不做介绍了。

Hbase11XWriter 插件文档

首先看文档Hbase11XWriter 插件文档 需要看下面部分就可以了。 这里补充一下,最后完整的json串,reader部分就参考Hbase11XReader 插件文档中的reader ,wirter部分就参考Hbase11XWriter 插件文档中的writer部分就可以了

代码语言:javascript复制
 "writer":{
                    "name":"hbase11xwriter",
                    "parameter":{
                        "hbaseConfig":{
                            "hbase.zookeeper.quorum":"***"
                        },
                        "table":"writer",
                        "mode":"normal",
                        "rowkeyColumn":[
                            {
                                "index":0,
                                "type":"string"
                            },
                            {
                                "index":-1,
                                "type":"string",
                                "value":"_"
                            }
                        ],
                        "column":[
                            {
                                "index":1,
                                "name":"cf1:q1",
                                "type":"string"
                            },
                            {
                                "index":2,
                                "name":"cf1:q2",
                                "type":"string"
                            },
                            {
                                "index":3,
                                "name":"cf1:q3",
                                "type":"string"
                            },
                            {
                                "index":4,
                                "name":"cf2:q1",
                                "type":"string"
                            },
                            {
                                "index":5,
                                "name":"cf2:q2",
                                "type":"string"
                            },
                            {
                                "index":6,
                                "name":"cf2:q3",
                                "type":"string"
                            }
                        ],

这个部分只要注意rowkeyColumn部分就可以了

rowkeyColumn 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量。配置格式如下: "rowkeyColumn": [ { "index":0, "type":"string" }, { "index":-1, "type":"string", "value":"_" } ]

可能是理解能力不怎么好,这个部分采坑了,如你需要自定义rowKey 哪么就需要设置

代码语言:javascript复制
{
     "index":-1,
      "type":"string",
      "value":"custom"
 }

只设置一个常量是不行的,如果只设置一个常量那么全部colunm的rowKey就全部是一样的了,所以要组合。 "index":0 是什么意思呢?就是取我们刚刚reader部分从备份表获取数据的第0个数据的value来组合

代码语言:javascript复制
{
     "index":0,
      "type":"string",
 }

所以 最后组合的rowKey为:"index":-1部分的“custom”加上"index":0部分vaule 10 作为最终的rowkey值为:custom10

我这里处理很简单,就是每个表都加一个id,我取id作为rowKey,因为备份表的rowKey也是和id一样的,这样备份表和目标表的rowKey才能保存一致

为什么要保持rowKey一致,如果rowKey不一致,我们用备份表的rowKey在目标表中是扫描不到数据的。

按照这样组装完成后吧json串保存到一个文件夹就可以了。

测试的话就是下载DataX源码 cd 到/datax/bin 执行

代码语言:javascript复制
 python datax.py  /Users/xxxx/xxx/datax/job/45_job.json 

同步成功

截屏2021-04-30 15.59.41.png

其他类型的数据同步也是一样的套路,一个调试通了其他的也很简单了。

0 人点赞