Maxwell、Flume将MySQL业务数据增量采集至Hdfs

2024-08-07 12:07:49 浏览数 (1)

采集背景

此文章来自尚硅谷电商数仓6.0

我们在采集业务数据时,要将增量表的数据从MySQL采集到hdfs,这时需要先做一个首日全量的采集过程,先将数据采集至Kafka中(方便后续进行实时处理),再将数据从Kafka采集到hdfs中。此时会出现零点漂移问题。(第一天接近24点的数据从Kafka流过被flume采集时header里面的时间戳时间【记录的是当前时间不是业务时间】会因延迟导致变成第二天的时间)而我们在HDFSSink的时间路径又是来自于header的时间戳,因此我们构造一个拦截器来处理这种情况。从而将数据准确进行增量采集。

创建Flume采集器

kafka_to_hdfs_db.conf

代码语言:shell复制
vim kafka_to_hdfs_db.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
# 拦截器
a1.sources.r1.interceptors.i1.type = org.example.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

Flume拦截器

pom文件

若maven加载不了,可在项目根目录下强制更新缓存中的依赖项 :mvn clean install -U

代码语言:shell复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>gmall</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.10.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

TimestampAndTableNameInterceptor

采集器原理:

由于零点漂移问题,我们设置一个拦截器,对每个Event进行拦截,此时封装的数据来自kafka,kafka数据来自Maxwell,我们需要的数据是body的ts以及里面的表名,用于Flume采集器的路径配置。(%{tableName}_inc/%Y-%m-%d) 所以我们要取到这两个数据进行处理,然后加载到header中。

其中Maxwell采集到Kafka的数据格式如下:

代码语言:json复制
{
  "database": "gmall",
  "table": "order_info",
  "type": "delete",
  "ts": 1722945195,
  "xid": 282971,
  "xoffset": 105,
  "data": {
    "id": 1235,
    "consignee": "费婷",
    "consignee_tel": "13136724768",
    "total_amount": 8197.00,
    "order_status": "1002",
    "user_id": 258,
    "payment_way": "3501",
    "delivery_address": null,
    "order_comment": null,
    "out_trade_no": "239576597685841",
    "trade_body": "Apple iPhone 12 (A2404) 64GB 红色 支持移动联通电信5G 双卡双待手机等1件商品",
    "create_time": "2022-06-08 18:16:59",
    "operate_time": "2022-06-08 18:17:33",
    "expire_time": null,
    "process_status": null,
    "tracking_no": null,
    "parent_order_id": null,
    "img_url": null,
    "province_id": 1,
    "activity_reduce_amount": 0.00,
    "coupon_reduce_amount": 0.00,
    "original_total_amount": 8197.00,
    "feight_fee": null,
    "feight_fee_reduce": null,
    "refundable_time": "2022-06-15 18:16:59"
  }
}
代码语言:shell复制
package org.example;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        Long ts = jsonObject.getLong("ts");
        //Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
        String timeMills = String.valueOf(ts * 1000);

        String tableName = jsonObject.getString("table");

        headers.put("timestamp", timeMills);
        headers.put("tableName", tableName);
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor ();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

创建Flume启动脚本

代码语言:shell复制
vim f3.sh
echo " --------启动 hadoop102 业务数据flume-------"
nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &

chmod 777 ./f3.sh

创建mysql_to_kafka_inc_init.sh脚本

该脚本的作用是初始化所有的增量表(首日全量),只需执行一次

代码语言:shell复制
vim mysql_to_kafka_inc_init.sh

#!/bin/bash


MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

增加执行权限启动脚本

代码语言:shell复制
# 加权限
chmod 777 ./mysql_to_kafka_inc_init.sh

启动脚本

代码语言:shell复制
# 删除历史数据
hadoop fs -ls /origin_data/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

# 启动
# 先启动hadoop、zookeeper、kafka、Maxwell
# 启动Maxwell采集器
mysql_to_kafka_inc_init.sh
# 启动Flume采集器
f3.sh
# 启动数据生成器

检查结果

0 人点赞