需求要从一个HBase把数据同步到另外一个HBase库中,这个需求要怎么用DataX来实现了,首先阅读下官方文档
Reader 插件文档
Hbase11XReader 插件文档
代码语言:javascript复制{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "xxxf"
},
"table": "users",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
},
{
"name": "info: birthday",
"type": "date",
"format":"yyyy-MM-dd"
},
{
"name": "info: company",
"type": "string"
},
{
"name": "address: contry",
"type": "string"
},
{
"name": "address: province",
"type": "string"
},
{
"name": "address: city",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true
}
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/Users/shf/workplace/datax_test/hbase11xreader/result",
"fileName": "qiran",
"writeMode": "truncate"
}
}
}
]
}
}
我们需要组装一个这样的结构让DataX执行,每个属性是什么意思请自行看文档。这里不做过多的讲解
reader
首先我们看下reader,reader就是我们需要把备份数据表数据组装到parameter里面。这里只讲重要的。
*注意
代码语言:javascript复制 "hbaseConfig": {"hbase.zookeeper.quorum": "xxxf"},
在配置的时候我是把Hbase的地址加入到了/etc/hosts里面
代码语言:javascript复制5x.8x.1xx.255 data-node01
然后使用的是映射名称
代码语言:javascript复制 "hbaseConfig": {"hbase.zookeeper.quorum": "data-node01"},
"column": []
这个里面需要读取备份表的数据
简单介绍一下Java 访问Hbase
代码语言:javascript复制 public void connectionHBase() throws Exception {
Configuration config = HBaseConfiguration.create();// 配置
config.set("hbase.zookeeper.quorum", "映射的Hbase地址");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort","端口号");
config.set("zookeeper.znode.parent", "parent");
connection = ConnectionFactory.createConnection(config);
}
网上一般都没有这个配置,我是访问连接不上咨询Hbase同事,同事告诉我需要加上。所以这个加不加看Hbase的设置
代码语言:javascript复制 config.set("zookeeper.znode.parent", "parent");
根据表名获取Hbase表的数据
代码语言:javascript复制public List<ObjectNode> getColumns(String tableName) throws Exception {
List<ObjectNode> list = new ArrayList<>();
try {
Table table= connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(this.startRowKey.getBytes());
scan.withStopRow(this.endRowKey.getBytes());
ResultScanner resultScanner = table.getScanner(scan);
for(Result result: resultScanner){
List<Cell> cells = result.listCells();
for(Cell cell : cells){
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String data = Bytes.toString(CellUtil.cloneValue(cell));
ObjectNode objectNode = JSONUtils.createObjectNode();
objectNode.put("rowKey", rowKey);
objectNode.put("type", "string");
objectNode.put("name", family ":" qualifier);
objectNode.put("data", data);
list.add(objectNode);
}
}
} catch (IOException e) {
e.printStackTrace();
}
//关闭连接
connection.close();
return list;
}
获取到表数据后我们就可以组装reader的column
name:就是取ObjectNode的name(这里的name是列簇 列名) type:我也没有找到获取Hbase的数据类型,所以我模式使用string
代码语言:javascript复制 {
"name": "rowkey",
"type": "string"
},
其他参数组装就很简单这里不做介绍了。
Hbase11XWriter 插件文档
首先看文档Hbase11XWriter 插件文档 需要看下面部分就可以了。 这里补充一下,最后完整的json串,reader部分就参考Hbase11XReader 插件文档中的reader ,wirter部分就参考Hbase11XWriter 插件文档中的writer部分就可以了
代码语言:javascript复制 "writer":{
"name":"hbase11xwriter",
"parameter":{
"hbaseConfig":{
"hbase.zookeeper.quorum":"***"
},
"table":"writer",
"mode":"normal",
"rowkeyColumn":[
{
"index":0,
"type":"string"
},
{
"index":-1,
"type":"string",
"value":"_"
}
],
"column":[
{
"index":1,
"name":"cf1:q1",
"type":"string"
},
{
"index":2,
"name":"cf1:q2",
"type":"string"
},
{
"index":3,
"name":"cf1:q3",
"type":"string"
},
{
"index":4,
"name":"cf2:q1",
"type":"string"
},
{
"index":5,
"name":"cf2:q2",
"type":"string"
},
{
"index":6,
"name":"cf2:q3",
"type":"string"
}
],
这个部分只要注意rowkeyColumn部分就可以了
rowkeyColumn 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量。配置格式如下: "rowkeyColumn": [ { "index":0, "type":"string" }, { "index":-1, "type":"string", "value":"_" } ]
可能是理解能力不怎么好,这个部分采坑了,如你需要自定义rowKey 哪么就需要设置
代码语言:javascript复制{
"index":-1,
"type":"string",
"value":"custom"
}
只设置一个常量是不行的,如果只设置一个常量那么全部colunm的rowKey就全部是一样的了,所以要组合。 "index":0 是什么意思呢?就是取我们刚刚reader部分从备份表获取数据的第0个数据的value来组合
代码语言:javascript复制{
"index":0,
"type":"string",
}
所以 最后组合的rowKey为:"index":-1部分的“custom”加上"index":0部分vaule 10 作为最终的rowkey值为:custom10
我这里处理很简单,就是每个表都加一个id,我取id作为rowKey,因为备份表的rowKey也是和id一样的,这样备份表和目标表的rowKey才能保存一致
为什么要保持rowKey一致,如果rowKey不一致,我们用备份表的rowKey在目标表中是扫描不到数据的。
按照这样组装完成后吧json串保存到一个文件夹就可以了。
测试的话就是下载DataX源码 cd 到/datax/bin 执行
代码语言:javascript复制 python datax.py /Users/xxxx/xxx/datax/job/45_job.json
同步成功
截屏2021-04-30 15.59.41.png
其他类型的数据同步也是一样的套路,一个调试通了其他的也很简单了。