1. 这个需求的宽表是在干啥?
需求架构图:
实时中的宽表其实是在退维,退维是数据处理中的一种操作,它是将细粒度的数据合并、归并为粗粒度数据的过程。在数据处理的过程中,原始数据往往包含大量的细节和细粒度信息,而有时候我们需要将这些细粒度数据转化为更高层次、更普遍的概要信息,以支持更广泛的数据分析和业务需求,这个过程就是退维。
具体来说,退维的过程可以通过聚合、归并、分组等方式来实现,它的目的是将数据从细粒度退化为粗粒度,以便更好地理解数据和从中获取有用的信息。退维的应用场景包括但不限于以下几个方面:
- 数据汇总:在数据分析和报告中,通常需要呈现一些汇总数据,比如总销售额、平均值、总数量等。通过退维可以将原始的交易数据或事件数据汇总为具有代表性的数据。
- 统计分析:在统计分析中,有时候需要将数据按照一定的维度进行分组,并计算每个组的统计指标,如计数、求和、平均值等。退维可以将细粒度的数据按照指定的维度进行聚合,得到更高层次的统计结果。
- 数据可视化:在数据可视化的过程中,退维可以将原始数据转化为更适合展示的形式,如制作柱状图、折线图、饼图等。这样可以更好地传达数据的趋势和特点。
- 数据优化:在数据仓库或数据湖中,为了提高查询性能和降低存储成本,有时候需要对数据进行压缩或汇总,退维是实现这些优化的一种方式。
总之,退维是数据处理中的重要环节,它帮助我们从复杂的细粒度数据中提取出有用的信息,简化数据分析过程,同时还有助于节约存储空间和提高数据处理效率。通过退维,我们可以更好地理解数据,发现数据的规律和趋势,并支持更广泛的业务应用和决策。
2. Kafka生产者消息事件
2.1 创建Kafka生产者
代码语言:javascript复制[root@hadoop10 ~]# kafka-topics.sh --zookeeper hadoop10:2181 --create --topic ods_event --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "ods_event".
2.2 生产者发送消息
代码语言:javascript复制[root@hadoop10 kafka0.11]# bin/kafka-console-producer.sh --broker-list hadoop10:9092 --topic ods_event
>{"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","lat":38.089969323508726,"lng":114.35731900345093,"username":"tiger","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
2.3 Idea编写CommonDimensionOdsDwd
代码语言:javascript复制package demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CommonDimensionOdsDwd {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.executeSql("create table ods_event(n"
" release_channel string,n"
" device_type string,n"
" session_id string,n"
" lat double,n"
" lng double,n"
" username string,n"
" eventId string,n"
" eventTime bigint,n"
" properties Map<String,String>,n"
" proc as proctime()n"
")with(n"
" 'connector' = 'kafka',n"
" 'topic' = 'ods_event',n"
" 'properties.bootstrap.servers' = 'hadoop10:9092',n"
" 'properties.group.id' = 'x1',n"
" 'scan.startup.mode' = 'group-offsets',n"
" 'format' = 'json'n"
")").print();
tenv.executeSql("select * from ods_event").print();
}
}
测试数据
两条示例,甩进kafka的生产者ods_event窗口
代码语言:javascript复制{"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","lat":38.089969323508726,"lng":114.35731900345093,"username":"guoyachao","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
{"release_channel":"360应用市场","device_type":"mi6","session_id":"s02","lat":38.089969323508726,"lng":114.35731900345093,"username":"arthas","eventId":"add_cart","eventTime":1670583694000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
通过生产者发送消息,在idea中可以接收到,再进行下一步。
3. 页面信息维表
我们还需要关联另外三张维表,页面信息维表、用户信息维和地理信息维,将kafka接收到的事件消息和这两个维度进行关联,然后构建宽表,这个宽表有三表中所有信息。
3.1 页面信息维表hbase建表
代码语言:javascript复制hbase:002:0> create 'dim_page_info','f'
Created table dim_page_info
Took 1.2378 seconds
=> Hbase::Table - dim_page_info
3.2 hbase编辑shell
代码语言:javascript复制[root@hadoop10 ~]# vim dim_page_info_loadhbase.sh
代码语言:javascript复制put 'dim_page_info' , '/mall/' , 'f:pt', '商品详情页'
put 'dim_page_info' , '/mall/' , 'f:sv', '商城服务'
put 'dim_page_info' , '/cotent/article/' , 'f:pt', '文章页'
put 'dim_page_info' , '/content/article/' , 'f:sv', '内容服务'
put 'dim_page_info' , '/mall/promotion/' , 'f:pt', '活动页'
put 'dim_page_info' , '/mall/promotion/' , 'f:sv', '商城服务'
put 'dim_page_info' , '/mall/search/' , 'f:pt', '搜索结果页'
put 'dim_page_info' , '/mall/search/' , 'f:sv', '搜索服务'
用hbase shell执行脚本
代码语言:javascript复制[root@hadoop10 ~]# hbase shell /root/dim_page_info_loadhbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/installs/hbase2.4/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/installs/hadoop-3.1.4/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
hbase:001:0> put 'dim_page_info' , '/mall/' , 'f:pt', '商品详情页'
Took 0.7823 seconds
hbase:002:0> put 'dim_page_info' , '/mall/' , 'f:sv', '商城服务'
Took 0.0391 seconds
hbase:003:0> put 'dim_page_info' , '/cotent/article/' , 'f:pt', '文章页'
Took 0.0230 seconds
hbase:004:0> put 'dim_page_info' , '/content/article/' , 'f:sv', '内容服务'
Took 0.0120 seconds
hbase:005:0> put 'dim_page_info' , '/mall/promotion/' , 'f:pt', '活动页'
Took 0.0061 seconds
hbase:006:0> put 'dim_page_info' , '/mall/promotion/' , 'f:sv', '商城服务'
Took 0.0046 seconds
hbase:007:0> put 'dim_page_info' , '/mall/search/' , 'f:pt', '搜索结果页'
Took 0.0164 seconds
hbase:008:0> put 'dim_page_info' , '/mall/search/' , 'f:sv', '搜索服务'
Took 0.0060 seconds
hbase:009:0>
查看导入完成:
代码语言:javascript复制hbase:001:0> scan 'dim_page_info',{LIMIT=>10,FORMATTER => 'toString'}
ROW COLUMN CELL
/content/article/ column=f:sv, timestamp=2023-07-16T12:29:01.720, value=内容服务
/cotent/article/ column=f:pt, timestamp=2023-07-16T12:29:01.669, value=文章页
/mall/ column=f:pt, timestamp=2023-07-16T12:29:01.542, value=商品详情页
/mall/ column=f:sv, timestamp=2023-07-16T12:29:01.619, value=商城服务
/mall/promotion/ column=f:pt, timestamp=2023-07-16T12:29:01.751, value=活动页
/mall/promotion/ column=f:sv, timestamp=2023-07-16T12:29:01.770, value=商城服务
/mall/search/ column=f:pt, timestamp=2023-07-16T12:29:01.798, value=搜索结果页
/mall/search/ column=f:sv, timestamp=2023-07-16T12:29:01.829, value=搜索服务
5 row(s)
Took 0.0726 seconds
如果表的数据量不大,可以将他整理到shell脚本,一次写入hbase。
4. 用户信息维度
这个维表在Hbase中已经创建过创建,且已经通过Flink CDC与业务库MySQL实时关联。此Hbase业务表的构建方法和思路依托前一个需求:http://t.csdn.cn/FVBO4。
代码语言:javascript复制 tenv.executeSql(" CREATE TABLE dim_user_info (n"
" username String,n"
" f ROW<id INT,phone String,status INT,create_time timestamp(3),gender int,birthday date,city String,job String,source_type int>,n"
" PRIMARY KEY (username) NOT ENFORCEDn"
") WITH (n"
" 'connector' = 'hbase-2.2',n"
" 'table-name' = 'dim_user_info',n"
" 'zookeeper.quorum' = 'hadoop10:2181',n"
" 'lookup.cache.max-rows' = '1000',n" //缓存最大条数
" 'lookup.cache.ttl' = '1min'n" //缓存最大保存时间
")");
ChatGPT对该Flink SQL语句的解释:
代码语言:javascript复制CREATE TABLE dim_user_info: 这部分指定了创建名为 dim_user_info 的 Flink 表。
username String: 这是表的第一个字段,类型为 String,代表 HBase 表的 RowKey。
f ROW<id INT,phone String,status INT,create_time timestamp(3),gender int,birthday date,city String,job String,source_type int>: 这是表的第二个字段 f,它是一个复合类型的字段。在 HBase 表中,这个字段将被表示为一个列族,包含多个列。
PRIMARY KEY (username) NOT ENFORCED: 这部分定义了 username 字段为主键,NOT ENFORCED 表示这个主键不是强制性的,即在写入数据时可以重复。
WITH (...): 这是一个表的连接器选项部分,它指定了 Flink 如何连接到 HBase 表。具体的选项如下:
'connector' = 'hbase-2.2': 这里指定了使用 HBase 2.2 版本的连接器。
'table-name' = 'dim_user_info': 指定了连接的 HBase 表的名称为 dim_user_info。
'zookeeper.quorum' = 'hadoop10:2181': 指定了连接的 HBase ZooKeeper 的地址。
'lookup.cache.max-rows' = '1000': 指定了查询结果的最大缓存行数为 1000 行。在执行 Flink 查询时,它会将查询的结果缓存在 Flink 中,以提高查询性能。
'lookup.cache.ttl' = '1min': 指定了查询结果的缓存过期时间为 1 分钟。即如果某个查询结果在 1 分钟内没有被使用,则会从缓存中移除。
该查询语句的含义是在 Flink 中创建了一个连接到 HBase 表 dim_user_info 的表,其中 username 字段作为主键,字段 f 是一个复合类型的字段,用于存储 HBase 表中的列族和列。通过这个表,你可以在 Flink 中方便地查询和操作 HBase 表中的数据。
在这一步我们只需要运行代码,然后在控制台打印出这个表即可,如下图所示。证明可以读取到Hbase中的数据,以供下一步做宽表连接用户信息时备用。如果没有数据,则需要按照本项目需求一的内容排查。
5. 地理信息维
该维表同样要提前写在Hbase中,其中地理信息的解析思路和方法在我的Spark数仓项目中已经给出,这里不再赘述。
具体的代码验证方法同用户信息维一样:
代码语言:javascript复制 tenv.executeSql(" CREATE TABLE dim_geo_area (n"
" geohash String,n"
" f ROW<prov String,city String,area String>,n"
" PRIMARY KEY (geohash) NOT ENFORCEDn"
") WITH (n"
" 'connector' = 'hbase-2.2',n"
" 'table-name' = 'dim_geo_area',n"
" 'zookeeper.quorum' = 'hadoop10:2181',n"
" 'lookup.cache.max-rows' = '1000',n" //缓存最大条数
" 'lookup.cache.ttl' = '1min'n" //缓存最大保存时间
")");
下图显示已经成功读取到Hbase中的地理信息表数据打印:
验证两表关联
代码语言:javascript复制 tenv.executeSql("select t1.*,t2.f.phone,t2.f.status,t3.f.prov,t3.f.city,t3.f.area n"
"from ods_event t1 n"
"left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username "
"left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash ").print();
通过Kafka发送的生产者消息,将事件信息与用户及地理维表关联,可以读取到关于用户信息和地理位置的事件信息详细,页面信息表这里还没有关联。其中用户信息支持修改,并通过FlinkCDC同步到Hbase,地理信息在下图中太长了,一张图没有截取到。此截图的实验是修改了MySQL业务库表中用户guoyachao的phone信息,从15516000447改为13253161303,然后重新通过kafka发送事件消息,得到新的维表关联结果。
6. 宽表的整合
完整版的该需求代码如下:
体现了上文三个表的查询和关联,包括页面信息表,用户信息表,地理信息表。对于地理的解析需要用到自定义函数,在spark数仓项目中已经给出过。
代码语言:javascript复制package demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CommonDimensionOdsDwd {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
/**
* {"release_channel":"华为应用市场",
* "device_type":"mi8",
* "session_id":"s09",
* "lat":38.089969323508726,
* "lng":114.35731900345093,
* "username":"windy",
* "eventId":"add_cart",
* "eventTime":1670586083000,
* "properties":{"url":"/content/article/2354.html?a=3","itemId":"item003"}}
*
* create table ods_event(
* release_channel string,
* device_type string,
* session_id string,
* lat double,
* lng double,
* username string,
* eventId string,
* eventTime bigint,
* properties Map<String,String>
* )with(
* 'connector' = 'kafka',
* 'topic' = 'ods_event',
* 'properties.bootstrap.servers' = 'hadoop10:9092',
* 'properties.group.id' = 'x1',
* 'scan.startup.mode' = 'group-offsets',
* 'format' = 'json'
* )
*/
tenv.executeSql("create table ods_event(n"
" release_channel string,n"
" device_type string,n"
" session_id string,n"
" lat double,n"
" lng double,n"
" username string,n"
" eventId string,n"
" eventTime bigint,n"
" properties Map<String,String>,n"
" proc as proctime()n"
")with(n"
" 'connector' = 'kafka',n"
" 'topic' = 'ods_event',n"
" 'properties.bootstrap.servers' = 'hadoop10:9092',n"
" 'properties.group.id' = 'x1',n"
" 'scan.startup.mode' = 'group-offsets',n"
" 'format' = 'json'n"
")").print();
//tenv.executeSql("select * from ods_event").print();
/**
* CREATE TABLE dim_user_info (
* username String,
* f ROW<id INT,phone String,status String,create_time String,gender String,birthday String,city String,job String,source_type String>
* PRIMARY KEY (username) NOT ENFORCED
* ) WITH (
* 'connector' = 'hbase-2.2',
* 'table-name' = 'dim_user_info',
* 'zookeeper.quorum' = 'hadoop10:2181'
* )
*/
tenv.executeSql(" CREATE TABLE dim_user_info (n"
" username String,n"
" f ROW<id INT,phone String,status INT,create_time timestamp(3),gender int,birthday date,city String,job String,source_type int>,n"
" PRIMARY KEY (username) NOT ENFORCEDn"
") WITH (n"
" 'connector' = 'hbase-2.2',n"
" 'table-name' = 'dim_user_info',n"
" 'zookeeper.quorum' = 'hadoop10:2181',n"
" 'lookup.cache.max-rows' = '1000',n" //缓存最大条数
" 'lookup.cache.ttl' = '1min'n" //缓存最大保存时间
")");
// tenv.executeSql("select * from dim_user_info").print();
tenv.executeSql(" CREATE TABLE dim_geo_area (n"
" geohash String,n"
" f ROW<prov String,city String,area String>,n"
" PRIMARY KEY (geohash) NOT ENFORCEDn"
") WITH (n"
" 'connector' = 'hbase-2.2',n"
" 'table-name' = 'dim_geo_area',n"
" 'zookeeper.quorum' = 'hadoop10:2181',n"
" 'lookup.cache.max-rows' = '1000',n" //缓存最大条数
" 'lookup.cache.ttl' = '1min'n" //缓存最大保存时间
")");
// tenv.executeSql("select * from dim_geo_area").print();
/**
* select t1.*,t2.f.phone,t2.f.status,t3.f.prov,t3.f.city,t3.f.area
* from ods_event t1
* left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username
* left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash
*/
// tenv.executeSql("select t1.*,t2.f.phone,t2.f.status,t3.f.prov,t3.f.city,t3.f.area n"
// "from ods_event t1 n"
// "left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username "
// "left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash ").print();
//宽表整合代码见下:
tenv.executeSql(
" create table page_hbase( "
" url_prefix STRING, "
" f ROW< "
" sv STRING, "
" pt STRING> "
" ) WITH( "
" 'connector' = 'hbase-2.2', "
" 'table-name' = 'dim_page_info', "
" 'zookeeper.quorum' = 'hadoop10:2181' "
" ) "
);
tenv.executeSql(
" CREATE TABLE dwd_kafka( "
" user_id BIGINT, "
" username string, "
" session_id string, "
" event_Id string, "
" event_time bigint, "
" lat double, "
" lng double, "
" release_channel string, "
" device_type string, "
" properties map<string,string>, "
" register_phone STRING, "
" user_status INT, "
" register_time TIMESTAMP(3), "
" register_gender INT, "
" register_birthday DATE, "
" register_city STRING, "
" register_job STRING, "
" register_source_type INT, "
" gps_province STRING, "
" gps_city STRING, "
" gps_region STRING, "
" page_type STRING, "
" page_service STRING "
" ) WITH ( "
" 'connector' = 'kafka', "
" 'topic' = 'dwd_events', "
" 'properties.bootstrap.servers' = 'hadoop10:9092', "
" 'properties.group.id' = 'testGroup', "
" 'scan.startup.mode' = 'earliest-offset', "
" 'value.format'='json') "
);
tenv.createTemporaryFunction("geohash",GeoHashUDF.class);
tenv.executeSql("insert into dwd_kafka select "
"cast(t2.f.id as bigint) as user_id,t1.username,t1.session_id,t1.eventId as event_id,t1.eventTime,n"
"t1.lat,t1.lng,t1.release_channel,t1.device_type,t1.properties,t2.f.phone as register_phone,n"
"t2.f.status as user_status,t2.f.create_time as register_time,t2.f.gender as register_gender,n"
"t2.f.birthday as register_birthday,t2.f.city as register_city,t2.f.job as register_job,n"
"t2.f.source_type as register_source_type,t3.f.prov,n"
"t3.f.city as gps_city,t3.f.area,t4.f.pt as page_type,t4.f.sv as page_service n"
"from ods_event t1 n"
"left join dim_user_info for system_time as of t1.proc t2 on t1.username = t2.username "
"left join dim_geo_area for system_time as of t1.proc t3 on geohash(t1.lat,t1.lng) = t3.geohash "
"left join page_hbase for system_time as of t1.proc t4 on regexp_extract(t1.properties['url'],'(.*/).*',1) = t4.url_prefix")
.print();
}
}
这是我们最终的实验,事件消息从kafka进入,经过hbase维表的关联,再从kafka消费者中出来,作为dwd层宽表明细。下面是详细完整的kafka生产者操作代码和测试数据,与截图保持一致。
代码语言:javascript复制[root@hadoop10 ~]# kafka-topics.sh --zookeeper hadoop10:2181 --create --topic dwd_events --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "dwd_events".
[root@hadoop10 ~]# cd /opt/installs/kafka0.11/
[root@hadoop10 kafka0.11]# bin/kafka-console-consumer.sh --bootstrap-server hadoop10:9092 --topic dwd_events
{"user_id":16,"username":"guoyachao","session_id":"s01","event_Id":"add_cart","event_time":1670583693000,"lat":38.089969323508726,"lng":114.35731900345093,"release_channel":"360应用市场","device_type":"mi6","properties":{"itemId":"item002","url":"/content/article/2354.html?a=3"},"register_phone":"13253161303","user_status":1,"register_time":"2023-07-10 12:34:56","register_gender":1,"register_birthday":"1990-01-01","register_city":"New York","register_job":"Engineer","register_source_type":1,"gps_province":"河北省","gps_city":"石家庄市","gps_region":"鹿泉区","page_type":null,"page_service":"内容服务"}
{"user_id":21,"username":"arthas","session_id":"s02","event_Id":"add_cart","event_time":1670583694000,"lat":38.089969323508726,"lng":114.35731900345093,"release_channel":"360应用市场","device_type":"mi6","properties":{"itemId":"item002","url":"/content/article/2354.html?a=3"},"register_phone":"15516000000","user_status":1,"register_time":"2023-07-06 15:30:20","register_gender":1,"register_birthday":"1998-03-25","register_city":"Berlin","register_job":"Writer","register_source_type":1,"gps_province":"河北省","gps_city":"石家庄市","gps_region":"鹿泉区","page_type":null,"page_service":"内容服务"}
{"user_id":10,"username":"user1","session_id":"s09","event_Id":"add_cart","event_time":1670585523000,"lat":39.28996932350873,"lng":112.35731900345093,"release_channel":"华为应用市场","device_type":"mi8","properties":{"itemId":"item003","url":"/content/article/2354.html?a=3"},"register_phone":"1234567890","user_status":1,"register_time":"2023-07-10 12:34:56","register_gender":1,"register_birthday":"1990-01-01","register_city":"New York","register_job":"Engineer","register_source_type":1,"gps_province":"山西省","gps_city":"朔州市","gps_region":"朔城区","page_type":null,"page_service":"内容服务"}