Flink在实时在实时计算平台和实时数仓中的企业级应用小结

2021-04-21 15:49:10 浏览数 (2)

大数据领域自 2010 年开始,以 Hadoop、Hive 为代表的离线计算开始进入各大公司的视野。大数据领域开始了如火如荼的发展。我个人在学校期间就开始关注大数据领域的技术迭代和更新,并且有幸在毕业后成为大数据领域的开发者。

在过去的这几年时间里,以 Storm、Spark、Flink 为代表的实时计算技术接踵而至。2019 年阿里巴巴内部 Flink 正式开源。整个实时计算领域风起云涌,一些普通的开发者因为业务需要或者个人兴趣开始接触Flink。

Apache Flink(以下简称 Flink)一改过去实时计算领域为人诟病的缺陷,以其强大的计算能力和先进的设计理念,迅速成为实时计算领域先进生产力的代表。各大小公司纷纷开始在 Flink 的应用上进行探索,其中最引人瞩目的两个方向便是:实时计算平台和实时数据仓库。

Flink 实时计算

如果你是一位大数据领域的开发人员或者你是一名后端的开发者,那么你对下面这些需求场景应该不会陌生:

代码语言:javascript复制
我是抖音主播,我想看带货销售情况的排行?
我是运营,我想看到我们公司销售商品的 TOP10?
我是开发,我想看到我们公司所有生产环境中服务器的运行情况?
...... 

在 Hadoop 时代,我们通常的做法是将数据批量存储到 HDFS 中,在用 Hive 产出离线的报表。或者我们使用类似 ClickHouse 或者 PostgreSQL 这样的数据库存储生产数据,用 SQL 直接进行汇总查看。

那么这样的方式有什么问题呢?

第一种,基于 Hive 的离线报表形式。大部分公司随着业务场景的不断丰富,同时在业界经过多年的实践检验,基于 Hadoop 的离线存储体系已经足够成熟。但是离线计算天然时效性不强,一般都是隔天级别的滞后,业务数据随着实践的推移,本身的价值就会逐渐减少。越来越多的场景需要使用实时计算,在这种背景下实时计算平台的需求应运而生。

第二种,基于 ClickHouse 或者 PostgreSQL 直接进行汇总查询。这种情况在一些小规模的公司使用非常常见,原因只有一个就是数据量不够大。在我们常用的具有 OLAP 特性的数据库的使用过程中,如果在一定的数据量下直接用复杂的 SQL 查询,一条复杂的 SQL 足以引起数据库的剧烈抖动,甚至直接宕机,对生产环境产生毁灭性的影响。这种查询在大公司是坚决不能进行的操作。

因此基于 Flink 强大实时计算能力消费实时数据的需求便应运而生。在实时数据平台中,Flink 会承担实时数据的采集、计算和发送到下游。

Flink 实时数据仓库

数据仓库最初是指的我们存储的 Hive 中的表的集合。按照业务需求一般会分为原始层、明细层、汇总层、业务层。各个公司根据实际业务需要会有更为细致的划分。

传统的离线数据仓库的做法一般是将数据按天离线集中存储后,按照固定的计算逻辑进行数据的清洗、转换和加载。最终在根据业务需求进行报表产出或者提供给其他的应用使用。我们很明显的可以看到,数据在这中间有了至少 T 1 天的延迟,数据的时效性大打折扣。

这时,实时数据仓库应运而生。一个典型的实时数据仓库架构图如下:

技术选型

这一部分作者结合自身在阿里巴巴这样的公司生产环境中的技术选择和实际应用的中一些经验,来讲解实时计算平台和实时数据仓库的各个部分是如何进行技术选型的。

实时计算引擎

我们在上面提到,实时计算解决的最重要的问题就是实时性和稳定性。

实时计算对数据有非常高的稳定性和精确性要求,特别是面向公众第三方的数据大屏,同时要求高吞吐、低延迟、极高的稳定性和绝对零误差。随时电商大促的成交记录一次次被刷新,背后是下单、支付、发货高达几万甚至十几万的峰值 QPS。

你可以想象这样的场景吗?天猫双十一,万众瞩目下的实时成交金额大屏突然卡住没有反应。我估计所有开发人员都要被开除了…

我们以一个最常见和经典的实时计算大屏幕来举例。

在面向实际运营的数据大屏中,需要提供高达几十种维度的数据,每秒的数据量高达千万甚至亿级别,这对于我们的实时计算架构提出了相当高的要求。那么我们的大屏背后的实时处理在这种数据量规模如何才能达到高吞吐、低延迟、极高的稳定性和绝对零误差的呢?

在上图的架构图中,涉及几个关键的技术选型,我们下面一一进行讲解。

业务库 Binlog 同步利器 - Canal

我们的实时计算架构一般是基于业务数据进行的,但无论是实时计算大屏还是常规的数据分析报表,都不能影响业务的正常进行,所以这里需要引入消息中间件或增量同步框架 Canal。

我们生产环境中的业务数据绝大多数都是基于 MySQL 的,所以需要一个能够实时监控 MySQL 业务数据变化的工具。Canal 是阿里巴巴开源的数据库 Binlog 日志解析框架,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

Canal 的原理也非常简单,它会伪装成一个数据库的从库,来读取 Binlog 并进行解析。Canal 在阿里巴巴内部有大规模的应用,因为阿里有众多的业务是跨机房部署,大量业务需要进行业务同步,Canal 功能强大,性能也很稳定。

解耦和海量数据支持 - Kafka

在实时大屏的技术架构下,我们的数据源绝大多数情况下都是消息。我们需要一个强大的消息中间件来支撑高达几十万 QPS,同时支持海量数据存储。

首先,我们为什么需要引入消息中间件?主要是下面三个目的:

  • 同步变异步
  • 应用解耦
  • 流量削峰

在我们的架构中,为了和业务数据互相隔离,需要使用消息中间件进行解耦从而互不影响。另外在双十一等大促场景下,交易峰值通常出现在某一个时间段,这个时间段系统压力陡增,数据量暴涨,消息中间件还起到了削峰的作用。

为什么选择 Kafka?

Kafka 是最初由 Linkedin 公司开发,是一个分布式、高吞吐、多分区的消息中间件。Kafka 经过长时间的迭代和实践检验,因为其独特的优点已经成为目前主流的分布式消息引擎,经常被用作企业的消息总线、实时数据存储等。

Kafka 从众多的消息中间件中脱颖而出,主要是因为高吞吐、低延迟的特点;另外基于 Kafka 的生态越来越完善,各个实时处理框架包括 Flink 在消息处理上都会优先进行支持。并且 Flink 和 Kafka 结合可以实现端到端精确一次语义的原理。

Kafka 作为大数据生态系统中已经必不可少的一员,主要的特性如下所示。

  • 高吞吐: 可以满足每秒百万级别消息的生产和消费,并且可以通过横向扩展,保证数据处理能力可以得到线性扩展。
  • 低延迟: 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
  • 高容错: Kafka 允许集群的节点出现失败。
  • 可靠性: 消息可以根据策略进行磁盘的持久化,并且读写效率都很高。
  • 生态丰富: Kafka 周边生态极其丰富,与各个实时处理框架结合紧密。

实时计算服务 - Flink

Flink 在当前的架构中主要承担了消息消费、维表关联、消息发送等。在实时计算领域,Flink 的优势主要包括:

  • 强大的状态管理。 Flink 使用 State 存储中间状态和结果,并且有强大的容错能力;
  • 非常丰富的 API。 Flink 提供了包含 DataSet API、DataStream API、Flink SQL 等等强大的API;
  • 生态支持完善。 Flink 支持多种数据源(Kafka、MySQL等)和存储(HDFS、ES 等),并且和其他的大数据领域的框架结合完善;
  • 批流一体。 Flink 已经在将流计算和批计算的 API 进行统一,并且支持直接写入 Hive。

对于 Flink 的一些特点我们不做过多展开了。这里需要注意的是,Flink 在消费完成后一般会把计算结果数据发往三个方向:

  • 高度汇总,高度汇总指标一般存储在 Redis、HBase 中供前端直接查询使用。
  • 明细数据,在一些场景下,我们的运营和业务人员需要查询明细数据,有一些明细数据极其重要,比如双十一派送的包裹中会有一些丢失和破损。
  • 实时消息,Flink 在计算完成后,有一个下游是发往消息系统,这里的作用主要是提供给其他业务复用; 另外,在一些情况下,我们计算好明细数据也需要再次经过消息系统才能落库,将原来直接落库拆成两步,方便我们进行问题定位和排查。

百花齐放 - OLAP 数据库选择

OLAP 的选择是当前实时架构中最有争议和最困难的。目前市面上主流的开源 OLAP 引擎包含但不限于:Hive、Hawq、Presto、Kylin、Impala、SparkSQL、Druid、Clickhouse、Greeplum 等,可以说目前没有一个引擎能在数据量,灵活程度和性能上做到完美,用户需要根据自己的需求进行选型。

我曾经在之前的一篇文章 《实时数仓 | 你需要的是一款强大的 OLAP 引擎》用了两万字分析了目前市面上主流的 OLAP 数据库的选型问题,这里直接给出结论:

  • Hive、Hawq、Impala: 基于 SQL on Hadoop
  • Presto 和 Spark SQL 类似: 基于内存解析 SQL 生成执行计划
  • Kylin: 用空间换时间、预计算
  • Druid: 数据实时摄入加实时计算
  • ClickHouse: OLAP 领域的 HBase,单表查询性能优势巨大
  • Greenpulm: OLAP 领域的 PostgreSQL

如果你的场景是基于 HDFS 的离线计算任务,那么 Hive、Hawq 和 Imapla 就是你的调研目标。

如果你的场景解决分布式查询问题,有一定的实时性要求,那么 Presto 和 SparkSQL 可能更符合你的期望。

如果你的汇总维度比较固定,实时性要求较高,可以通过用户配置的维度 指标进行预计算,那么不妨尝试 Kylin 和 Druid。

ClickHouse 则在单表查询性能上独领风骚,远超过其他的 OLAP 数据库。

Greenpulm 作为关系型数据库产品,性能可以随着集群的扩展线性增长,更加适合进行数据分析。

Flink 实时数据仓库

实时数据仓库的发展经历了从离线到实时的发展,一个典型的实时数据仓库架构如下如图所示:

一般实时数据仓库的设计也借鉴了离线数仓的理念,不但要提高我们模型的复用率,也要考虑实时数仓的稳定性和易用性。

在实时数据仓库的技术选型中,用到的核心技术包括:Kafka、Flink、Hbase 等。

其中 Kafka 和 Flink 的优势我们在上述实时数据平台的技术选型中已经做过详细的介绍。这其中还有两个关键的指标存储系统:Hbase 和 Redis。

其中 Hbase 是典型的列式分布式存储系统,Redis 是缓存系统中首选,他们的主要优势包括:

  • 强一致性
  • 自动故障转移和容错性
  • 极高的读写 QPS,非常适合存储 K-V 形式的指标

批流一体是未来

随着 Flink 1.12 版本的发布,Flink 与 Hive 的集成达到了一个全新的高度,Flink 可以很方便的对 Hive 直接进行读写。

这代表了什么?

只要我们还在使用实时数据仓库,那么我们可以直接对 Hive 进行读写,Flink 成为了 Hive 上的一个处理引擎,既可以通过批的方式也可以通过流的方式。从 Flink 1.12 开始会有大批的离线实时一体的数据仓库出现。

我们数据仓库架构就变成了:

其中 Flink SQL 统一了实时和离线的逻辑,避免出现离线和实时需要两套架构和代码支撑,也基本解决了离线和实时数据对不齐的尴尬局面。

大厂的实时计算平台和实时数仓技术方案

这部分小编结合自身在实际生产环境中的经验,参考了市面上几个大公司在实时计算平台和实时数仓设计中,选出了其中最稳妥也是最常用的技术方案,奉献给大家。

作者的经验

在我们的实时计算架构中采用的是典型的 Kappa 架构,我们的业务难点和重点主要集中在:

  • 数据源过多

我们的实时消息来源多达几十个,分布在各大生产系统中,这些系统中的消息数据格式不一。

  • 数据源之间时间 GAP 巨大

我们业务数据之间需要互相等待,举个最简单的例子。用户下单后,可能 7 天以后后还会进行操作,这就导致一个问题,我们在建设实时数仓时中间状态 State 巨大,直接使用 Flink 原生的状态会导致任务资源消耗巨大,非常不稳定。

  • 离线数据和实时数据要求强一致性

我们的数据最终会以考核的形式下发,直接指导一线员工的工资和奖金发放。要求数据强一致性保障,否则会引起投诉甚至舆情。

基于以上的考虑,我们的实时数据仓库架构如下:

几个关键的技术点如下:

第一,我们使用了 Hbase 作为中间状态的存储。我们在上面提到,因为在 Flink SQL 中进行计算需要存储中间状态,而我们的数据源过多,且时间差距过大,那么实时计算的状态存储变得异常巨大,在大数据量的冲击下,任务变得非常不稳定。另外如果任务发生 Fail-over,状态会丢失,结果严重失真。所以我们所有的数据都会存储在 Hbase。

第二,实时数据触发模式计算。在 Flink SQL 的逻辑里,Hbase 的变更消息发出,我们只需要接受其中的 rowkey 信息,然后所有的数据都是反查 Hbase。我们在上面的文章中讲到过,Hbase 因为极高的读写 QPS 被各大公司普遍应用在实时存储和高频查询中。

第三,双写 ADB 和 Hologres。ADB 和 Hologres 是阿里云提供的强大的 OLAP 引擎。我们在 Flink SQL 计算完毕后将结果双写,前端查询可以进行分流和负载均衡。

第四,离线数据同步。这里我们采用的是直接将消息通过中间件进行同步,在离线数仓中有一套一样的逻辑将数据写入 Hive 中。在 Flink 1.12 后,离线和实时的计算逻辑统一为一套,完全避免了离线和实时消息的不一致难题。

但是,客观的说这套数据架构有没有什么问题呢?

  • 这套数据架构引入了 Hbase 作为中间存储,数据链路变长。导致运维成本大量增加,整个架构的实时性能受制于 Hbase 的变更信息能不能及时发送。
  • 指标没有分层,会导致 ADB 和 Hologres 成为查询瓶颈。在这套数据架构中,我们完全抛弃了中间指标层,完全依赖 SQL 直接汇总查询。一方面得益于省略中间层后指标的准确性,另一方面因为 SQL 直接查询会对 ADB 有巨大的查询压力,使得 ADB 消耗了巨大的资源和成本。

在未来的规划中,我们希望对业务 SQL 进行分级。高优先级、实时性极高的指标和数据直接查询数据库。非高优先级和极高实时性的指标可以通过历史数据加实时数据结合的方式组装结果,减少对数据库的查询压力。

腾讯看点的实时数据系统设计

腾讯看点数据中心承接了腾讯 QQ 看点、小程序、浏览器、快报等等业务的开发取数、看数的需求。腾讯看点一天上报的数据量可以达到万亿级规模,对低延迟、亚秒级的实时计算和多维查询带来了巨大的技术挑战。

首先,我们来看一下腾讯看点的实时数据系统的架构设计:

上图是腾讯看点的整体的实时架构设计图。我们可以看到整体的架构分为三层:

  • 数据采集层

在这层中,腾讯看点完全使用消息队列 Kafka 进行了解耦操作,避免直接读取业务系统数据。

  • 实时数据仓库层

在这一层中腾讯看点使用 Flink 分别做分钟级别的聚合和中度聚合,大大减轻了实时 SQL 查询的压力。

  • 实时数据存储层

腾讯看点使用 ClickHouse 和 MySQL 作为实时数据存储,我们在下面会分析 ClickHouse 作为实时数据存储的优势和特点。

关于数据选型,实时数仓的整体架构腾讯看点选择了 Lambda 架构,主要是因为高灵活性、高容错性、高成熟度、极低的迁移成本。

在实时计算上,腾讯看点选择了 Flink 作为计算引擎,Flink 受到青睐的原因包括 Exactly-once 语义支持,轻量级的快照机制以及极高的吞吐性。另一一个很重要的原因就是 Flink 高效的维表关联,支持了实时数据流 (百万级/s) 关联 HBase 维度表。

在数据存储上,腾讯看点重度使用 ClickHouse。ClickHouse 的优势包括:

  • 多核 CPU 并行计算
  • SIMD 并行计算加速
  • 分布式水平扩展集群
  • 稀疏索引、列式存储、数据压缩
  • 聚合分析优化

最终腾讯看点的实时数据系统支撑了亚秒级响应多维条件查询请求:

  • 过去 30 分钟内容的查询,99% 的请求耗时在1秒内
  • 过去 24 小时内容的查询,90% 的请求耗时在5秒内,99% 的请求耗时在 10 秒内
阿里巴巴批流一体数据仓库建设

我们在上面介绍了 Flink 的优势,尤其是在 Flink 1.12 版本后,Flink 与 Hive 的集成达到了一个全新的高度,Flink 可以很方便的对 Hive 直接进行读写。

阿里巴巴率先在业务实现了批流一体的实时数据仓库,根据公开的资料显示,阿里巴巴在批流一体上的探索主要包含三个方面:

  • 统一元数据管理

Flink 从 1.11 版本开始简化了连接 Hive 的方式,Flink 通过一套简单的 Hive Catelog API 打通了与 Hive 的通信。使得访问 Hive 变得轻而易举。

  • 统一计算引擎

在我们传统的实时数仓的建设中,基于离线和实时引擎的不同,需要编写两套 SQL 进行计算和数据入库操作。Flink 高效解决了这个问题,基于 ANSI-SQL 标准提供了批与流统一的语法,并且使用 Flink 引擎执行可以同时读写 Hive 与其他的 OLAP 数据库。

  • 统一数据存储

在这个架构下,离线数据成为了实时数据的历史备份,离线数据也可以作为数据源被实时摄入,批量计算的场景变成了实时调度,不在依赖定时调度任务。

基于以上的工作,基于 Flink 和 Hive 的批流一体实时数仓应运而生,整体的架构如下:

我们可以看到,原来的离线和实时双写链路演变成了单一通道,一套代码即可完成离线和实时的计算操作。并且基于 Flink 对 SQL 的支撑,代码开发变得异常简洁,阿里巴巴的批流一体数据仓库在 2020 年落地并且投入使用,效果显著,支撑了双十一的数据需求。

实战案例

这部分我们我们将以一个实时统计项目为背景,介绍实时计算中的架构设计和技术选型以及最终的实现。其中涉及了日志数据埋点、日志数据采集、清洗、最终的指标计算等等。

架构设计

我们以统计网站的 PV 和 UV 为例,涉及到几个关键的处理步骤:

  • 日志数据上报
  • 日志数据清洗
  • 实时计算程序
  • 结果存储

基于以上的业务处理流程,我们常用的实时处理技术选型和架构如下图所示:

整体的代码开发包括:

  • Flume 和 Kafka 整合和部署
  • Kafka 模拟数据生成和发送
  • Flink 和 Kafka 整合时间窗口设计
  • Flink 计算 PV、UV 代码实现
  • Flink 和 Redis 整合以及 Redis Sink 实现

Flume 和 Kafka 整合和部署

我们可以在 Flume 的官网下载安装包,在这里下载一个 1.8.0 的稳定版本,然后进行解压:

代码语言:javascript复制
代码语言:javascript复制
tar zxf apache-flume-1.8.0-bin.tar.gz
代码语言:javascript复制

可以看到有几个关键的目录,其中 conf/ 目录则是我们存放配置文件的目录。

接下来我们整合 Flume 和 Kafka。整体整合思路为,我们的两个 Flume Agent 分别部署在两台 Web 服务器上,用来采集两台服务器的业务日志,并且 Sink 到另一台 Flume Agent 上,然后将数据 Sink 到 Kafka 集群。在这里需要配置三个 Flume Agent。

首先在 Flume Agent 1 和 Flume Agent 2 上创建配置文件,修改 source、channel 和 sink 的配置,vim log_kafka.conf 代码如下:

代码语言:javascript复制
# 定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source的配置,监听日志文件中的新增数据
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/logs/access.log

#sink配置,使用avro日志做数据的消费
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumeagent03
a1.sinks.k1.port = 9000

#channel配置,使用文件做数据的临时缓存
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/temp/flume/checkpoint
a1.channels.c1.dataDirs = /home/temp/flume/data

#描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c

上述配置会监听 /home/logs/access.log 文件中的数据变化,并且将数据 Sink 到 flumeagent03 的 9000 端口。

然后我们分别启动 Flume Agent 1 和 Flume Agent 2,命令如下:

代码语言:javascript复制
$ flume-ng agent 
-c conf 
-n a1 
-f conf/log_kafka.conf >/dev/null 2>&1 &

第三个 Flume Agent 用来接收上述两个 Agent 的数据,并且发送到 Kafka。我们需要启动本地 Kafka,并且创建一个名为 log_kafka 的 Topic。

然后,我们创建 Flume 配置文件,修改 source、channel 和 sink 的配置,vim flume_kafka.conf 代码如下:

代码语言:javascript复制


# 定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#source配置
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 9000

#sink配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = log_kafka
a1.sinks.k1.brokerList = 127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

#channel配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1    

配置完成后,我们启动该 Flume Agent:

代码语言:javascript复制
$ flume-ng agent 
-c conf 
-n a1 
-f conf/flume_kafka.conf >/dev/null 2>&1 &

当 Flume Agent 1 和 2 中监听到新的日志数据后,数据就会被 Sink 到 Kafka 指定的 Topic,我们就可以消费 Kafka 中的数据了。

我们现在需要消费 Kafka Topic 信息,并且把序列化的消息转化为用户的行为对象:

代码语言:javascript复制
public class UserClick {

    private String userId;
    private Long timestamp;
    private String action;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public UserClick(String userId, Long timestamp, String action) {
        this.userId = userId;
        this.timestamp = timestamp;
        this.action = action;
    }
}

enum UserAction{
    //点击
    CLICK("CLICK"),
    //购买
    PURCHASE("PURCHASE"),
    //其他
    OTHER("OTHER");

    private String action;
    UserAction(String action) {
        this.action = action;
    }
}

在计算 PV 和 UV 的业务场景中,我们选择使用消息中自带的事件时间作为时间特征,代码如下:

代码语言:javascript复制


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 检查点配置,如果要用到状态后端,那么必须配置
env.setStateBackend(new MemoryStateBackend(true));

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("log_user_action", new SimpleStringSchema(), properties);
//设置从最早的offset消费
consumer.setStartFromEarliest();

DataStream<UserClick> dataStream = env
        .addSource(consumer)
        .name("log_user_action")
        .map(message -> {
            JSONObject record = JSON.parseObject(message);
            return new UserClick(
                    record.getString("user_id"),
                    record.getLong("timestamp"),
                    record.getString("action")
            );
        })
        .returns(TypeInformation.of(UserClick.class));

由于我们的用户访问日志可能存在乱序,所以使用 BoundedOutOfOrdernessTimestampExtractor 来处理乱序消息和延迟时间,我们指定消息的乱序时间 30 秒,具体代码如下:

代码语言:javascript复制
SingleOutputStreamOperator<UserClick> userClickSingleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClick>(Time.seconds(30)) {
    @Override
    public long extractTimestamp(UserClick element) {
        return element.getTimestamp();
    }
});

到目前为止,我们已经通过读取 Kafka 中的数据,序列化为用户点击事件的 DataStream,并且完成了水印和时间戳的设计和开发。

接下来,按照业务需要,我们需要开窗并且进行一天内用户点击事件的 PV、UV 计算。这里我们使用 Flink 提供的滚动窗口,并且使用 ContinuousProcessingTimeTrigger 来周期性的触发窗口阶段性计算。

代码语言:javascript复制
dataStream     
.windowAll(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))

为了减少窗口内缓存的数据量,我们可以根据用户的访问时间戳所在天进行分组,然后将数据分散在各个窗口内进行计算,接着在 State 中进行汇总。

首先,我们把 DataStream 按照用户的访问时间所在天进行分组:

代码语言:javascript复制
userClickSingleOutputStreamOperator
         .keyBy(new KeySelector<UserClick, String>() {
            @Override
            public String getKey(UserClick value) throws Exception {
                return DateUtil.timeStampToDate(value.getTimestamp());
            }
        })
        .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
        .evictor(TimeEvictor.of(Time.seconds(0), true))
        ...

然后根据用户的访问时间所在天进行分组并且调用了 evictor 来剔除已经计算过的数据。其中的 DateUtil 是获取时间戳的年月日:

代码语言:javascript复制
public class DateUtil {
    public static String timeStampToDate(Long timestamp){
        ThreadLocal<SimpleDateFormat> threadLocal
                = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        String format = threadLocal.get().format(new Date(timestamp));
        return format.substring(0,10);
    }
}

接下来我们实现自己的 ProcessFunction:

代码语言:javascript复制


public class MyProcessWindowFunction extends ProcessWindowFunction<UserClick,Tuple3<String,String, Integer>,String,TimeWindow>{

    private transient MapState<String, String> uvState;
    private transient ValueState<Integer> pvState;

    @Override
    public void open(Configuration parameters) throws Exception {

        super.open(parameters);
        uvState = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("uv", String.class, String.class));
        pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Integer>("pv", Integer.class));
    }

    @Override
    public void process(String s, Context context, Iterable<UserClick> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {

        Integer pv = 0;
        Iterator<UserClick> iterator = elements.iterator();
        while (iterator.hasNext()){
            pv = pv   1;
            String userId = iterator.next().getUserId();
            uvState.put(userId,null);
        }
        pvState.update(pvState.value()   pv);

        Integer uv = 0;
        Iterator<String> uvIterator = uvState.keys().iterator();
        while (uvIterator.hasNext()){
            String next = uvIterator.next();
            uv = uv   1;
        }

        Integer value = pvState.value();
        if(null == value){
            pvState.update(pv);
        }else {
            pvState.update(value   pv);
        }

        out.collect(Tuple3.of(s,"uv",uv));
        out.collect(Tuple3.of(s,"pv",pvState.value()));
    }
}

我们在主程序中可以直接使用自定义的 ProcessFunction :

代码语言:javascript复制
userClickSingleOutputStreamOperator
        .keyBy(new KeySelector<UserClick, String>() {
            @Override
            public String getKey(UserClick value) throws Exception {
                return value.getUserId();
            }
        })
        .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
        .evictor(TimeEvictor.of(Time.seconds(0), true))
        .process(new MyProcessWindowFunction());

到此为止,我们已经计算出来了 PV 和 UV,下面我们分别讲解 Flink 和 Redis 是如何整合实现 Flink Sink 的。

在这里我们直接使用开源的 Redis 实现,首先新增 Maven 依赖如下:

代码语言:javascript复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>

可以通过实现 RedisMapper 来自定义 Redis Sink,在这里我们使用 Redis 中的 HASH 作为存储结构,Redis 中的 HASH 相当于 Java 语言里面的 HashMap:

代码语言:javascript复制
public class MyRedisSink implements RedisMapper<Tuple3<String,String, Integer>>{

    /**
     * 设置redis数据类型
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET,"flink_pv_uv");
    }

    //指定key
    @Override
    public String getKeyFromData(Tuple3<String, String, Integer> data) {
        return data.f1;
    }
    //指定value
    @Override
    public String getValueFromData(Tuple3<String, String, Integer> data) {
        return data.f2.toString();
    }
}

上面实现了 RedisMapper 并覆写了其中的 getCommandDescription、getKeyFromData、getValueFromData 3 种方法,其中 getCommandDescription 定义了存储到 Redis 中的数据格式。这里我们定义的 RedisCommand 为 HSET,使用 Redis 中的 HASH 作为数据结构;getKeyFromData 定义了 HASH 的 Key;getValueFromData 定义了 HASH 的值。

然后我们直接调用 addSink 函数即可:

代码语言:javascript复制
...
userClickSingleOutputStreamOperator
            .keyBy(new KeySelector<UserClick, String>() {
                @Override
                public String getKey(UserClick value) throws Exception {
                    return value.getUserId();
                }
            })
            .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
            .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
            .evictor(TimeEvictor.of(Time.seconds(0), true))
            .process(new MyProcessWindowFunction())
            .addSink(new RedisSink<>(conf,new MyRedisSink()));
...

到此为止,我们就会将结果存进了 Redis 中,我们在实际业务中可以选择使用不同的目标库例如:Hbase 或者 MySQL 等等。

总结

以 Flink 为代表的实时计算技术还是飞速发展中,众多的新特性例如 Flink Hive Connector、CDC 增量同步等持续涌现,我们有理由相信基于 Flink 的实时计算平台和实时数据仓库的发展未来会大放异彩,解决掉业界在实时计算和实时数仓领域的痛点,成为大数据领域先进生产力的代表。

0 人点赞