本文是一个由多部分组成的系列文章的第一篇,展示了FlinkSQL应用于市场数据的强大功能和可表达性。该系列的代码和数据可在github上获得。它由量化建模负责人Simudyne和Krishnen Vytelingum合着。
速度在金融市场上至关重要。无论目标是最大化alpha还是最大程度地减少风险,金融技术人员都会投入大量资金,以获取有关市场状况以及行情的最新见解。事件驱动和流式处理体系结构可在事件发生时对事件进行复杂的处理,使其很自然地适合金融市场应用。
Flink SQL是一种数据处理语言,可用于事件驱动和流应用程序的快速原型设计和开发。Flink SQL将SQL的简单性和可访问性与Apache Flink(一种流行的分布式流媒体平台)的性能和可伸缩性结合在一起。借助Flink SQL,业务分析人员、开发人员和量化人员都可以快速建立流传输管道,以实时执行复杂的数据分析。
在本文中,我们将使用Simudyne开发的基于代理的模型(ABM)生成的综合市场数据。ABM并不是自上而下的方法,而是在复杂系统中对自主参与者(或代理)进行建模,例如:金融市场中的各种买卖双方。可以捕获这些交互,并可以针对许多应用程序分析生成的综合数据集,例如用于检测紧急欺诈行为的训练模型,或探索风险管理的“假设”场景。ABM生成的综合数据在历史数据不足或不可用的情况下很有用。
流式VWAP
我们从一个简单的示例开始,该示例从一系列交易事件中计算成交量加权平均价格(VWAP)。VWAP是交易中用来衡量证券的市场价格和未来方向的通用基准。在这里,我们有一个CSV格式的数据集,该数据集显示了一个交易日(2020年10月22日)的虚构证券(SIMUI)的交易事件。
代码语言:javascript复制sym,prc,vol,bid_id,ask_id,buyer_id,seller_id,step,time
SIMUl,149.86,2300,P|63-m-1,P|66-l-0,P|63,P|66,380,22-Oct-2020 08:00:07.600
SIMUl,149.86,1935,P|63-m-1,P|25-l-0,P|63,P|25,380,22-Oct-2020 08:00:07.600
SIMUl,149.74,582,P|18-l-0,P|98-m-0,P|18,P|98,428,22-Oct-2020 08:00:08.560
SIMUl,149.76,2475,P|27-l-0,P|42-m-1,P|27,P|42,1021,22-Oct-2020 08:00:20.420
SIMUl,149.84,21,P|5-m-0,P|42-l-0,P|5,P|42,1078,22-Oct-2020 08:00:21.560
SIMUl,149.76,2709,P|24-l-1,P|92-m-0,P|24,P|92,1200,22-Oct-2020 08:00:24.000
SIMUl,149.84,1653,P|8-m-1,P|24-l-0,P|8,P|24,1513,22-Oct-2020 08:00:30.260
SIMUl,149.84,400,P|19-m-0,P|24-l-0,P|19,P|24,1577,22-Oct-2020 08:00:31.540
这些列是:交易品种,价格,数量,出价ID,要价ID,买方ID,卖方ID,步骤和时间戳。步骤列是离散步骤ABM市场模拟的伪像,出于我们的目的可以忽略;其余各栏不言自明。
要处理此数据,我们需要通过发出CREATE TABLE语句来声明Flink SQL表。我们的示例数据是基于文件系统的,但是可以轻松更改连接器类型以从其他来源(例如Kafka主题)读取数据。请注意,event_time是派生的列,也用于水印。通过加水印,Flink可以限制等待延迟到达和故障事件的时间,以便可以取得进展。在这里,我们声明,到达event_time超过水印一分钟以上的记录将被忽略。
代码语言:javascript复制CREATE TABLE trades (
symbol STRING,
price DOUBLE,
vol INT,
bid_id STRING,
ask_id STRING,
buyer_id STRING,
seller_id STRING,
step INT,
ts_str STRING,
event_time AS TO_TIMESTAMP (ts_str, 'dd-MMM-yyyy HH:mm:ss.SSS'),
WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/varstream/data/trades_raw',
'format' = 'csv'
);
VWAP的公式很简单:对于指定时间段内的每笔交易,将价格乘以交易股份数即可。将其总和除以该时间段内已交易的股票总数。下面的流查询将显示当前的VWAP,它将随着新交易事件的到来而更新:
代码语言:javascript复制SELECT
symbol,
SUM (vol) AS cumulative_volume,
SUM (price * vol) AS cumulative_pv,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades
GROUP BY
symbol
;
实时播放
由于CSV文件中一个符号中只有一天的数据价值,因此结果更新可能发生得太快了,您几乎没有注意到。从源读取事件的速度比实时发生的速度要快。有时需要在准实时回放历史数据,就好像Flink现在正在接收历史事件数据(例如,用于演示或原型设计和开发过程中)。
为了解决这个问题,我们提供了一个简单的UDTF(用户定义的表函数),该数据以从行时间戳派生的人工延迟播放历史数据。UDTF有两个参数:第二个参数指定行时间戳(在我们的示例中为event_time),而第一个参数指定第一个行时间戳之后的分钟持续时间(以分钟为单位),以开始应用延迟。以下代码段显示了如何注册UDTF并在处理事件的前120分钟后将其用于视图中以应用延迟。请注意LATERAL TABLE联接的使用,该联接将函数应用于主表中的每一行。
代码语言:javascript复制-- Register UDTF
CREATE FUNCTION replay_after AS 'varstream.ReplayAfterFunction' LANGUAGE JAVA ;
-- Create a view
CREATE VIEW trades_replay AS (
SELECT * FROM trades
LEFT JOIN LATERAL TABLE (replay_after (120, trades.event_time)) ON TRUE
) ;
您可以通过发出一个简单的查询来验证事件的重播方式:
SELECT * FROM trades_replay
使用此视图,我们现在可以发出相同的VWAP聚合查询,并观察对VWAP的流更新,就好像它们是实时发生的一样:
代码语言:javascript复制SELECT
symbol,
SUM (vol) AS cumulative_volume,
SUM (price * vol) AS cumulative_pv,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades_replay
GROUP BY
symbol
;
尽管此UDTF在进行原型制作时非常有用,但从根本上没有打算把它用于生产用途。我们在这里使用它只是为了演示FlinkSQL如何在事件以模拟实时到达时更新聚合结果。
Group Windows
前面的示例显示了如何计算当天的流式VWAP。假设您要以每隔1分钟的时间建立一个带有蜡烛图的交易仪表板。您可能需要计算每分钟的VWAP、高价、低价和总体积。Flink SQL通过组窗口使此操作变得容易,组窗口可以在GROUP BY时间间隔上应用聚合函数。
下面显示了如何获取每分钟的VWAP:
代码语言:javascript复制CREATE VIEW vwap_1m AS (
SELECT
symbol,
TUMBLE_START (event_time, INTERVAL '1' MINUTES) AS start_time,
TUMBLE_ROWTIME (event_time, INTERVAL '1' MINUTES) AS row_time,
MAX (price) AS max_price,
MIN (price) AS min_price,
SUM (price * vol) AS total_price,
SUM (vol) AS total_vol,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades
GROUP BY
TUMBLE (event_time, INTERVAL '1' MINUTES), symbol
);
SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_1m ;
前面的操作为每分钟内发生的交易计算了VWAP。如果要在几分钟内计算移动的VWAP(MVWAP),则Flink SQL提供了一个跳跃的组窗口。下面显示了5分钟的移动VWAP,步长为1分钟。
代码语言:javascript复制CREATE VIEW vwap_5m AS (
SELECT
symbol,
HOP_START (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS start_time,
HOP_ROWTIME (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS row_time,
MAX (price) AS max_price,
MIN (price) AS min_price,
SUM (price * vol) AS total_price,
SUM (vol) AS total_vol,
SUM (price * vol) / SUM (vol) AS vwap
FROM
trades
GROUP BY
HOP (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES), symbol
);
SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_5m ;
结论
Flink SQL可以极大地简化和加快流数据流的开发。在本文中,我们探索了SQL GROUP BY子句的不同用法,以根据市场数据流计算VWAP的变化。在下一部分中,我们将向您展示如何从市场数据中提取每分钟的流式采样,以计算日内风险价值(IVaR)。我们希望本系列文章能鼓励您尝试将Flink SQL用于流式市场数据应用程序。
原文作者:Patrick Angeles& Krishnen Vytelingum
原文链接:https://blog.cloudera.com/streaming-market-data-with-flink-sql-part-i-streaming-vwap/