采集背景
此文章来自尚硅谷电商数仓6.0
我们在采集业务数据时,要将增量表的数据从MySQL采集到hdfs,这时需要先做一个首日全量的采集过程,先将数据采集至Kafka中(方便后续进行实时处理),再将数据从Kafka采集到hdfs中。此时会出现零点漂移问题。(第一天接近24点的数据从Kafka流过被flume采集时header里面的时间戳时间【记录的是当前时间不是业务时间】会因延迟导致变成第二天的时间)而我们在HDFSSink的时间路径又是来自于header的时间戳,因此我们构造一个拦截器来处理这种情况。从而将数据准确进行增量采集。
创建Flume采集器
kafka_to_hdfs_db.conf
代码语言:shell复制vim kafka_to_hdfs_db.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
# 拦截器
a1.sources.r1.interceptors.i1.type = org.example.TimestampAndTableNameInterceptor$Builder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
Flume拦截器
pom文件
若maven加载不了,可在项目根目录下强制更新缓存中的依赖项 :mvn clean install -U
代码语言:shell复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>gmall</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
TimestampAndTableNameInterceptor
采集器原理:
由于零点漂移问题,我们设置一个拦截器,对每个Event进行拦截,此时封装的数据来自kafka,kafka数据来自Maxwell,我们需要的数据是body的ts以及里面的表名,用于Flume采集器的路径配置。(%{tableName}_inc/%Y-%m-%d) 所以我们要取到这两个数据进行处理,然后加载到header中。
其中Maxwell采集到Kafka的数据格式如下:
代码语言:json复制{
"database": "gmall",
"table": "order_info",
"type": "delete",
"ts": 1722945195,
"xid": 282971,
"xoffset": 105,
"data": {
"id": 1235,
"consignee": "费婷",
"consignee_tel": "13136724768",
"total_amount": 8197.00,
"order_status": "1002",
"user_id": 258,
"payment_way": "3501",
"delivery_address": null,
"order_comment": null,
"out_trade_no": "239576597685841",
"trade_body": "Apple iPhone 12 (A2404) 64GB 红色 支持移动联通电信5G 双卡双待手机等1件商品",
"create_time": "2022-06-08 18:16:59",
"operate_time": "2022-06-08 18:17:33",
"expire_time": null,
"process_status": null,
"tracking_no": null,
"parent_order_id": null,
"img_url": null,
"province_id": 1,
"activity_reduce_amount": 0.00,
"coupon_reduce_amount": 0.00,
"original_total_amount": 8197.00,
"feight_fee": null,
"feight_fee_reduce": null,
"refundable_time": "2022-06-15 18:16:59"
}
}
代码语言:shell复制package org.example;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampAndTableNameInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
Long ts = jsonObject.getLong("ts");
//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
String timeMills = String.valueOf(ts * 1000);
String tableName = jsonObject.getString("table");
headers.put("timestamp", timeMills);
headers.put("tableName", tableName);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampAndTableNameInterceptor ();
}
@Override
public void configure(Context context) {
}
}
}
创建Flume启动脚本
代码语言:shell复制vim f3.sh
echo " --------启动 hadoop102 业务数据flume-------"
nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &
chmod 777 ./f3.sh
创建mysql_to_kafka_inc_init.sh脚本
该脚本的作用是初始化所有的增量表(首日全量),只需执行一次
代码语言:shell复制vim mysql_to_kafka_inc_init.sh
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell
import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
"cart_info")
import_data cart_info
;;
"comment_info")
import_data comment_info
;;
"coupon_use")
import_data coupon_use
;;
"favor_info")
import_data favor_info
;;
"order_detail")
import_data order_detail
;;
"order_detail_activity")
import_data order_detail_activity
;;
"order_detail_coupon")
import_data order_detail_coupon
;;
"order_info")
import_data order_info
;;
"order_refund_info")
import_data order_refund_info
;;
"order_status_log")
import_data order_status_log
;;
"payment_info")
import_data payment_info
;;
"refund_payment")
import_data refund_payment
;;
"user_info")
import_data user_info
;;
"all")
import_data cart_info
import_data comment_info
import_data coupon_use
import_data favor_info
import_data order_detail
import_data order_detail_activity
import_data order_detail_coupon
import_data order_info
import_data order_refund_info
import_data order_status_log
import_data payment_info
import_data refund_payment
import_data user_info
;;
esac
增加执行权限启动脚本
代码语言:shell复制# 加权限
chmod 777 ./mysql_to_kafka_inc_init.sh
启动脚本
代码语言:shell复制# 删除历史数据
hadoop fs -ls /origin_data/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f
# 启动
# 先启动hadoop、zookeeper、kafka、Maxwell
# 启动Maxwell采集器
mysql_to_kafka_inc_init.sh
# 启动Flume采集器
f3.sh
# 启动数据生成器