进阶 Flink 应用模式 Vol.3-自定义窗口处理

2022-04-11 09:51:53 浏览数 (1)

一、介绍

在本系列的前几篇文章中,我们描述了如何基于动态更新的配置(一组欺诈检测规则)实现灵活的流分区,以及如何利用 Flink 的广播机制在运行时在相关算子之间分配处理配置.

紧接着我们上次离开端到端解决方案的讨论,在本文中,我们将描述如何使用 Flink 的“瑞士刀” - Process Function 创建一个量身定制的实现匹配您的流业务逻辑要求。我们将在欺诈检测引擎的背景下继续讨论。我们还将演示如何在 DataStream API 提供的开箱即用窗口不满足您的要求的情况下实现您自己的时间窗口的自定义替换。特别是,我们将研究您在设计需要对单个事件做出低延迟反应的解决方案时可以做出的权衡。

本文将描述一些可以独立应用的高级概念,但建议您查看该系列的第一部分和第二部分中的材料并查看代码库,以便更容易理解。

二、ProcessFunction 作为“窗口”

低延迟

让我们从提醒我们想要支持的欺诈检测规则类型开始:

“只要同一付款人在 24 小时内支付给同一受益人的总金额超过 200,000 美元,就会触发警报。”

换句话说,给定由组合付款人和受益人字段的键划分的交易流,我们希望及时回顾并确定,对于每笔传入交易,如果两个特定参与者之间的所有先前付款的总和 超过定义的阈值。 实际上,计算窗口总是移动到特定数据分区键的最后观察到的事件的位置。

欺诈检测系统的常见关键要求之一是响应时间短。欺诈行为越早被检测到,它被阻止并减轻其负面后果的机会就越大。这一要求在金融领域尤为突出,您有一个重要的限制 - 评估欺诈检测模型所花费的任何时间都是您系统的守法用户等待响应所花费的时间。处理的快速性通常成为各种支付系统之间的竞争优势,产生警报的时间限制可能低至 300-500 毫秒。从将交易事件摄入欺诈检测系统的那一刻起,直到下游系统必须获得警报为止,这就是您所获得的所有时间。

您可能知道,Flink 提供了一个强大的 Window API,适用于广泛的用例。但是,如果您查看所有可用类型的受支持窗口,您会发现它们都不完全符合我们对此用例的主要要求 - 每个传入事务的低延迟评估。 Flink 中没有任何类型的窗口可以表达“从当前事件返回 x 分钟/小时/天”语义。在 Window API 中,事件属于窗口(由窗口分配器定义),但它们本身不能单独控制窗口*的创建和评估。如上所述,我们的欺诈检测引擎的目标是在收到新事件后立即对先前的相关数据点进行评估。这就提出了在这种情况下应用 Window API 的可行性问题。 Window API 提供了一些用于定义自定义触发器、驱逐器和窗口分配器的选项,它们可能会获得所需的结果。然而,通常很难做到这一点(而且很容易打破)。此外,这种方法不提供对广播状态的访问,这是实现业务规则的动态重新配置所必需的。

*)除了会话窗口,它们仅限于基于会话间隙的分配

让我们以使用 Flink 的 Window API 中的滑动窗口为例。 使用滑动窗口与 S 的滑动转换为等于 S/2 的评估延迟的预期值。 这意味着您需要定义一个 600-1000 毫秒的窗口滑动来满足 300-500 毫秒延迟的低延迟要求,甚至在考虑任何实际计算时间之前也是如此。 Flink 为每个滑动窗格存储单独的窗口状态这一事实使得这种方法在任何中等高负载条件下都不可行。

为了满足要求,我们需要创建自己的低延迟窗口实现。 幸运的是,Flink 为我们提供了执行此操作所需的所有工具。 ProcessFunction 是 Flink API 中一个低级但功能强大的构建块。 它有一个简单的合约:

代码语言:javascript复制
public class SomeProcessFunction extends KeyedProcessFunction<KeyType, InputType, OutputType> {
public void processElement(InputType event, Context ctx, Collector<OutputType> out){}
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputType> out) {}
public void open(Configuration parameters){}
}

processElement() ——接收输入事件。 您可以通过调用 out.collect(someOutput) 为下一个运算符生成一个或多个输出事件,从而对每个输入做出反应。 您还可以将数据传递到侧面输出或完全忽略特定输入。

onTimer——当先前注册的计时器触发时,Flink 会调用 onTimer()。 支持事件时间和处理时间计时器。

open() ——等价于构造函数。 它在 TaskManager 的 JVM 内部调用,用于初始化,例如注册 Flink 管理的状态。 它也是初始化不可序列化且无法从 JobManager 的 JVM 传输的字段的正确位置。

最重要的是,ProcessFunction 还可以访问由 Flink 处理的容错状态。 这种结合,再加上 Flink 的消息处理和传递保证,使得构建具有几乎任意复杂业务逻辑的弹性事件驱动应用程序成为可能。 这包括创建和处理具有状态的自定义窗口。

三、执行

状态和清理

为了能够处理时间窗口,我们需要在程序内部跟踪属于窗口的数据。 为了确保这些数据是容错的并且可以在分布式系统中出现故障,我们应该将它存储在 Flink 管理的状态中。 随着时间的推移,我们不需要保留所有以前的交易。 根据示例规则,所有超过 24 小时的事件都变得无关紧要。 我们正在查看一个不断移动的数据窗口,并且需要不断地将陈旧的事务移出范围(换句话说,从状态中清除)。

我们将使用 MapState 来存储窗口的各个事件。 为了允许有效地清理超出范围的事件,我们将使用事件时间戳作为 MapState 键。

在一般情况下,我们必须考虑到可能存在具有完全相同时间戳的不同事件这一事实,因此我们将存储集合而不是每个键(时间戳)的单个事务。

旁注:当在 KeyedProcessFunction 中使用任何 Flink 管理的状态时,state.value() 调用返回的数据会自动由当前处理的事件的键限定 - 参见图 4。如果使用 MapState,原理相同 适用,不同之处在于返回的是 Map 而不是 MyObject。 如果您被迫执行 mapState.value().get(inputEvent.getKey()) 之类的操作,您可能应该使用 ValueState 而不是 MapState。 由于我们想为每个事件键存储多个值,在我们的例子中,MapState 是正确的选择。

如本系列的第一篇博客所述,我们根据活动欺诈检测规则中指定的键调度事件。多个不同的规则可以基于相同的分组键。这意味着我们的警报功能可能会接收由相同键(例如 {payerId=25;beneficiaryId=12})限定的交易,但注定要根据不同的规则进行评估,这意味着时间窗口的长度可能不同。这就提出了一个问题,即我们如何才能最好地将容错窗口状态存储在 KeyedProcessFunction 中。一种方法是为每个规则创建和管理单独的 MapState。然而,这种方法会很浪费——我们将分别保存重叠时间窗口的状态,因此不必要地存储重复事件。更好的方法是始终存储足够的数据,以便能够估计所有当前活动的规则,这些规则由相同的键限定。为了实现这一点,每当添加新规则时,我们将确定其时间窗口是否具有最大跨度,并将其存储在特殊保留的 WIDEST_RULE_KEY 下的广播状态中。稍后将在状态清理过程中使用此信息,如本节后面所述。

代码语言:javascript复制
@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out){
  ...
  updateWidestWindowRule(rule, broadcastState);
}
private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState){
  Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);
  if (widestWindowRule == null) {
broadcastState.put(WIDEST_RULE_KEY, rule);
return;
  }
  if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {
broadcastState.put(WIDEST_RULE_KEY, rule);
  }
}

现在让我们更详细地看一下 main 方法 processElement() 的实现。

在上一篇博文中,我们描述了 DynamicKeyFunction 如何允许我们根据规则定义中的 groupingKeyNames 参数执行动态数据分区。 随后的描述集中在 DynamicAlertFunction,它利用了剩余的规则设置。

如博客系列前面部分所述,我们的警报流程函数接收类型为 Keyed<Transaction, String, Integer> 的事件,其中 Transaction 是主要的“包装”事件,String 是键(付款人#x - 受益人# 图 1 中的 y),Integer 是导致该事件分派的规则的 ID。 此规则以前存储在广播状态中,必须通过 ID 从该状态中检索。 下面是实现的概要:

代码语言:javascript复制
public class DynamicAlertFunction
extends KeyedBroadcastProcessFunction<
    String, Keyed<Transaction, String, Integer>, Rule, Alert> {
  private transient MapState<Long, Set<Transaction>> windowState;
  @Override
  public void processElement(
  Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out){
// Add Transaction to state
long currentEventTime = value.getWrapped().getEventTime();                            // <--- (1)
addToStateValuesSet(windowState, currentEventTime, value.getWrapped());
// Calculate the aggregate value
Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());    // <--- (2)
Long windowStartTimestampForEvent = rule.getWindowStartTimestampFor(currentEventTime);// <--- (3)
SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);            // <--- (4)
for (Long stateEventTime : windowState.keys()) {
  if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {
    aggregateValuesInState(stateEventTime, aggregator, rule);
  }
}
// Evaluate the rule and trigger an alert if violated
BigDecimal aggregateResult = aggregator.getLocalValue();                              // <--- (5)
boolean isRuleViolated = rule.apply(aggregateResult);
if (isRuleViolated) {
  long decisionTime = System.currentTimeMillis();
  out.collect(new Alert<>(rule.getRuleId(),
                          rule,
                          value.getKey(),
                          decisionTime,
                          value.getWrapped(),
                          aggregateResult));
}
// Register timers to ensure state cleanup
long cleanupTime = (currentEventTime / 1000) * 1000;                                  // <--- (6)
ctx.timerService().registerEventTimeTimer(cleanupTime);
  }

以下是步骤的详细信息:

1)我们首先将每个新事件添加到我们的窗口状态:

代码语言:javascript复制
static <K, V> Set<V> addToStateValuesSet(MapState<K, Set<V>> mapState, K key, V value)
  throws Exception {
Set<V> valuesSet = mapState.get(key);
if (valuesSet != null) {
  valuesSet.add(value);
} else {
  valuesSet = new HashSet<>();
  valuesSet.add(value);
}
mapState.put(key, valuesSet);
return valuesSet;
}

2)接下来,我们检索先前广播的规则,根据该规则需要评估传入的交易。

3) getWindowStartTimestampFor 在给定规则中定义的窗口跨度和当前事务时间戳的情况下确定我们的评估应该跨多远。

4)通过迭代所有窗口状态条目并应用聚合函数来计算聚合值。 它可以是平均值、最大值、最小值,或者如本节开头的示例规则中的总和。

代码语言:javascript复制
private boolean isStateValueInWindow(
Long stateEventTime, Long windowStartForEvent, long currentEventTime) {
  return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;
}
private void aggregateValuesInState(
Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {
  Set<Transaction> inWindow = windowState.get(stateEventTime);
  for (Transaction event : inWindow) {
BigDecimal aggregatedValue =
    FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);
aggregator.add(aggregatedValue);
  }
}

5)有了一个聚合值,我们可以将其与规则定义中指定的阈值进行比较,并在必要时发出警报。

6)最后,我们使用 ctx.timerService().registerEventTimeTimer() 注册一个清理定时器。 此计时器将负责在当前事务超出范围时将其删除。

注意——请注意创建计时器期间的舍入。 这是一项重要的技术,可以在触发定时器的精度和使用的定时器数量之间进行合理的权衡。 定时器存储在 Flink 的容错状态下,以毫秒级的精度管理它们可能是一种浪费。 在我们的例子中,通过这种舍入,我们将在任何给定的秒内为每个键创建最多一个计时器。 Flink 文档提供了一些额外的细节。

7)onTimer 方法会触发窗口状态的清理。

如前所述,我们始终将尽可能多的事件保持在状态中,以评估具有最宽窗口跨度的活动规则。 这意味着在清理过程中,我们只需要删除超出这个最宽窗口范围的状态即可。

清理程序的实现方式如下:

代码语言:javascript复制
@Override
public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)
throws Exception {
  Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);
  Optional<Long> cleanupEventTimeWindow =
  Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);
  Optional<Long> cleanupEventTimeThreshold =
  cleanupEventTimeWindow.map(window -> timestamp - window);
  // Remove events that are older than (timestamp - widestWindowSpan)ms
  cleanupEventTimeThreshold.ifPresent(this::evictOutOfScopeElementsFromWindow);
}
private void evictOutOfScopeElementsFromWindow(Long threshold) {
  try {
Iterator<Long> keys = windowState.keys().iterator();
while (keys.hasNext()) {
  Long stateEventTime = keys.next();
  if (stateEventTime < threshold) {
    keys.remove();
  }
}
  } catch (Exception ex) {
throw new RuntimeException(ex);
  }
}

注意——您可能想知道为什么我们不使用 ListState ,因为我们总是迭代窗口状态的所有值? 这实际上是对使用 RocksDBStateBackend 情况的优化。 遍历 ListState 会导致所有 Transaction 对象被反序列化。 使用 MapState 的键迭代器只会导致键的反序列化(长类型),因此减少了计算开销。

实现细节的描述到此结束。 我们的方法会在新事务到达时立即触发对时间窗口的评估。 因此,它满足了我们所针对的主要要求 - 潜在发出警报的低延迟。 有关完整的实现,请查看 github 上的项目。

四、改进和优化

所描述的方法的优点和缺点是什么?

优点:

低延迟能力

具有潜在用例特定优化的定制解决方案

高效的状态重用(具有相同密钥的规则的共享状态)

缺点:

无法利用现有 Window API 中潜在的未来优化

无延迟事件处理,在 Window API 中开箱即用

二次计算复杂度和潜在的状态非常

现在让我们看看后两个缺点,看看我们是否可以解决它们。

迟到的事件:

处理迟到的事件提出了一个问题——在迟到的事件到达的情况下重新评估窗口是否仍然有意义? 如果需要这样做,您需要将用于清理的最宽窗口扩展为最大预期无序度。 这将避免此类延迟触发的时间窗口数据可能不完整(参见图 7)。

然而,可以说,对于强调低延迟处理的用例,这种延迟触发将毫无意义。在这种情况下,我们可以跟踪到目前为止我们观察到的最新时间戳,对于不单调增加该值的事件,只需将它们添加到状态并跳过聚合计算和警报触发逻辑。

冗余重新计算和状态大小:

在我们描述的实现中,我们将单个事务保持在状态并检查它们以在每个新事件上一次又一次地计算聚合。就在重复计算上浪费计算资源而言,这显然不是最优的。

将个人交易保持在状态的主要原因是什么?存储事件的粒度直接对应时间窗计算的精度。因为我们单独存储事务,所以我们可以在单个事务离开确切的 2592000000 毫秒时间窗口(以毫秒为单位 30 天)时精确地忽略它们。在这一点上,值得提出一个问题——在估计如此长的时间窗口时,我们真的需要这种毫秒精度,还是可以在特殊情况下接受潜在的误报?如果您的用例的答案是不需要这种精度,您可以基于分桶和预聚合实现额外的优化。这种优化的思想可以分解如下:

与其存储单个事件,不如创建一个父类,该类可以包含单个事务的字段或组合值,基于将聚合函数应用于一组事务计算得出。

不要使用以毫秒为单位的时间戳作为 MapState 键,而是将它们四舍五入到您愿意接受的“分辨率”级别(例如,整分钟)。因此,每个条目代表一个桶。

每当评估一个窗口时,将新事务的数据附加到存储桶聚合中,而不是为每个事务存储单独的数据点。

状态数据和序列化器

为了进一步优化实现,我们可以问自己的另一个问题是,获得具有完全相同时间戳的不同事件的可能性有多大。在所描述的实现中,我们通过在 MapState<Long, Set<Transaction>> 中存储每个时间戳的事务集来演示解决此问题的一种方法。然而,这样的选择对性能的影响可能比预期的要大。原因是 Flink 目前不提供原生 Set 序列化器,而是强制回退到效率较低的 Kryo 序列化器 (FLINK-16729)。一个有意义的替代策略是假设,在正常情况下,没有两个差异事件可以具有完全相同的时间戳,并将窗口状态转换为 MapState<Long, Transaction> 类型。您可以使用辅助输出来收集和监控任何与您的假设相矛盾的意外事件。在性能优化期间,我通常建议您禁用对 Kryo 的回退,并通过确保使用更高效的序列化程序来验证您的应用程序可以进一步优化的地方。

提示:您可以通过设置断点并验证返回的 TypeInformation 的类型来快速确定您的类将使用哪个序列化程序。

事件修剪:我们可以将单个事件数据减少为仅相关信息,而不是存储完整事件并给 ser/de 机器施加额外压力。这可能需要将单个事件“解包”为字段,并根据活动规则的配置将这些字段存储到通用 Map<String, Object> 数据结构中。

虽然此调整可能会对大型对象产生重大改进,但它不应该是您的首选,因为它很容易变成过早的优化。

概括:

本文总结了我们在第一部分开始的欺诈检测引擎的实现描述。在这篇博文中,我们演示了如何利用 ProcessFunction 来“模拟”具有复杂自定义逻辑的窗口。我们已经讨论了这种方法的优缺点,并详细说明了如何应用自定义的特定于用例的优化——这是 Window API 无法直接实现的。

这篇博文的目的是说明 Apache Flink API 的强大功能和灵活性。它的核心是 Flink 的支柱,作为开发人员,它可以节省大量工作,并通过提供以下功能很好地推广到广泛的用例:

分布式集群中的高效数据交换

通过数据分区实现水平扩展

具有快速本地访问的容错状态

使用此状态的方便抽象,就像使用局部变量一样简单

多线程、并行执行引擎。 ProcessFunction 代码在单个线程中运行,无需同步。 Flink 处理所有并行执行方面和对共享状态的正确访问,而您作为开发人员无需考虑它(并发性很困难)。

所有这些方面使得使用 Flink 构建应用程序成为可能,这些应用程序远远超出了琐碎的流 ETL 用例,并能够实现任意复杂的分布式事件驱动应用程序。使用 Flink,您可以重新思考针对各种用例的方法,这些用例通常依赖于使用无状态并行执行节点并将状态容错的问题“推送”到数据库,这种方法面对不断增长的数据量通常注定会遇到可扩展性问题。

0 人点赞