HBase 数据迁移到 Kafka 实战

2020-06-09 14:29:47 浏览数 (1)

文章作者:哥不是小萝莉

编辑整理:Hoh Xil

内容来源:

https://www.cnblogs.com/smartloli/p/11521659.html

1. 概述

在实际的应用场景中,数据存储在 HBase 集群中,但是由于一些特殊的原因,需要将数据从 HBase 迁移到 Kafka。正常情况下,一般都是源数据到 Kafka,再有消费者处理数据,将数据写入 HBase。但是,如果逆向处理,如何将 HBase 的数据迁移到 Kafka 呢?今天笔者就给大家来分享一下具体的实现流程。

2. 内容

一般业务场景如下,数据源头产生数据,进入 Kafka,然后由消费者 ( 如 Flink、Spark、Kafka API ) 处理数据后进入到 HBase。这是一个很典型的实时处理流程。流程图如下:

上述这类实时处理流程,处理数据都比较容易,毕竟数据流向是顺序处理的。但是,如果将这个流程逆向,那么就会遇到一些问题。

2.1 海量数据

HBase 的分布式特性,集群的横向拓展,HBase 中的数据往往都是百亿、千亿级别,或者数量级更大。这类级别的数据,对于这类逆向数据流的场景,会有个很麻烦的问题,那就是取数问题。如何将这海量数据从 HBase 中取出来?

2.2 没有数据分区

我们知道 HBase 做数据 Get 或者 List<Get> 很快,也比较容易。而它又没有类似 Hive 这类数据仓库分区的概念,不能提供某段时间内的数据。如果要提取最近一周的数据,可能全表扫描,通过过滤时间戳来获取一周的数据。数量小的时候,可能问题不大,而数据量很大的时候,全表去扫描 HBase 很困难。

3. 解决思路

对于这类逆向数据流程,如何处理。其实,我们可以利用 HBase Get 和 List<Get> 的特性来实现。因为 HBase 通过 RowKey 来构建了一级索引,对于 RowKey 级别的取数,速度是很快的。实现流程细节如下:

数据流程如上图所示,下面笔者为大家来剖析每个流程的实现细节,以及注意事项。

3.1 Rowkey 抽取

我们知道 HBase 针对 Rowkey 取数做了一级索引,所以我们可以利用这个特性来展开。我们可以将海量数据中的 Rowkey 从 HBase 表中抽取,然后按照我们制定的抽取规则和存储规则将抽取的 Rowkey 存储到 HDFS 上。

这里需要注意一个问题,那就是关于 HBase Rowkey 的抽取,海量数据级别的 Rowkey 抽取,建议采用 MapReduce 来实现。这个得益于 HBase 提供了 TableMapReduceUtil 类来实现,通过 MapReduce 任务,将 HBase 中的 Rowkey 在 map 阶段按照指定的时间范围进行过滤,在 reduce 阶段将 rowkey 拆分为多个文件,最后存储到 HDFS 上。

这里可能会有同学有疑问,都用 MapReduce 抽取 Rowkey 了,为啥不直接在扫描处理列簇下的列数据呢?这里,我们在启动 MapReduce 任务的时候,Scan HBase 的数据时只过滤 Rowkey ( 利用 FirstKeyOnlyFilter 来实现 ),不对列簇数据做处理,这样会快很多。对 HBase RegionServer 的压力也会小很多。

这里举个例子,比如上表中的数据,其实我们只需要取出 Rowkey ( row001 )。但是,实际业务数据中,HBase 表描述一条数据可能有很多特征属性 ( 例如姓名、性别、年龄、身份证等等 ),可能有些业务数据一个列簇下有十几个特征,但是他们却只有一个 Rowkey,我们也只需要这一个 Rowkey。那么,我们使用 FirstKeyOnlyFilter 来实现就很合适了。

代码语言:javascript复制
/**
 * A filter that will only return the first KV from each row.
 * <p>
 * This filter can be used to more efficiently perform row count operations.
 */

这个是 FirstKeyOnlyFilter 的一段功能描述,它用于返回第一条 KV 数据,官方其实用它来做计数使用,这里我们稍加改进,把 FirstKeyOnlyFilter 用来做抽取 Rowkey。

3.2 Rowkey 生成

抽取的 Rowkey 如何生成,这里可能根据实际的数量级来确认 Reduce 个数。建议生成 Rowkey 文件时,切合实际的数据量来算 Reduce 的个数。尽量不用为了使用方便就一个 HDFS 文件,这样后面不好维护。举个例子,比如 HBase 表有 100GB,我们可以拆分为100个文件。

3.3 数据处理

在步骤1中,按照抽取规则和存储规则,将数据从 HBase 中通过 MapReduce 抽取 Rowkey 并存储到 HDFS 上。然后,我们在通过 MapReduce 任务读取 HDFS 上的 Rowkey 文件,通过 List<Get> 的方式去 HBase 中获取数据。拆解细节如下:

Map 阶段,我们从 HDFS 读取 Rowkey 的数据文件,然后通过批量 Get 的方式从 HBase 取数,然后组装数据发送到 Reduce 阶段。在 Reduce 阶段,获取来自 Map 阶段的数据,写数据到 Kafka,通过 Kafka 生产者回调函数,获取写入 Kafka 状态信息,根据状态信息判断数据是否写入成功。如果成功,记录成功的 Rowkey 到 HDFS,便于统计成功的进度;如果失败,记录失败的 Rowkey 到 HDFS,便于统计失败的进度。

3.4 失败重跑

通过 MapReduce 任务写数据到 Kafka 中,可能会有失败的情况,对于失败的情况,我们只需要记录 Rowkey 到 HDFS 上,当任务执行完成后,再去程序检查 HDFS 上是否存在失败的 Rowkey 文件,如果存在,那么再次启动步骤10,即读取 HDFS 上失败的 Rowkey 文件,然后再 List<Get> HBase 中的数据,进行数据处理后,最后再写 Kafka,以此类推,直到 HDFS 上失败的 Rowkey 处理完成为止。

4. 实现代码

这里实现的代码量也并不复杂,下面提供一个伪代码,可以在此基础上进行改造 ( 例如 Rowkey 的抽取、MapReduce 读取 Rowkey 并批量 Get HBase 表,然后在写入 Kafka 等 )。示例代码如下:

代码语言:javascript复制
public class MRROW2HDFS {

public static void main(String[] args) throws Exception {

        Configuration config = HBaseConfiguration.create(); // HBase Config info
        Job job = Job.getInstance(config, "MRROW2HDFS");
        job.setJarByClass(MRROW2HDFS.class);
        job.setReducerClass(ROWReducer.class);

        String hbaseTableName = "hbase_tbl_name";

        Scan scan = new Scan();
        scan.setCaching(1000);
        scan.setCacheBlocks(false);
        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

public static class ROWMapper extends TableMapper<Text, Text> {

@Override
protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {

for (Cell cell : value.rawCells()) {
// Filter date range
// context.write(...);
            }
        }
    }

public static class ROWReducer extends Reducer<Text,Text,Text,Text>{
private Text result = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text val:values){
                result.set(val);
                context.write(key, result);
            }
        }
    }
}

5. 总结

整个逆向数据处理流程,并不算复杂,实现也是很基本的 MapReduce 逻辑,没有太复杂的逻辑处理。在处理的过程中,需要几个细节问题,Rowkey 生成到 HDFS 上时,可能存在行位空格的情况,在读取 HDFS 上 Rowkey 文件去 List<Get> 时,最好对每条数据做个过滤空格处理。另外,就是对于成功处理 Rowkey 和失败处理 Rowkey 的记录,这样便于任务失败重跑和数据对账。可以知晓数据迁移进度和完成情况。同时,我们可以使用 Kafka Eagle 监控工具来查看 Kafka 写入进度。

6. 结束语

本文就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

0 人点赞