流媒体与实时计算,Netflix公司Druid应用实践

2020-05-19 12:55:21 浏览数 (1)

Netflix(Nasdaq NFLX),也就是网飞公司,成立于1997年,是一家在线影片[租赁]提供商,主要提供Netflix超大数量的[DVD]并免费递送,总部位于美国加利福尼亚州洛斯盖图。1999年开始订阅服务。2009年,该公司可提供多达10万部DVD电影,并有1千万的订户。2007年2月25日,Netflix宣布已经售出第10亿份DVD。

Netflix已经连续五次被评为顾客最满意的网站。可以通过PC、TV及iPad、iPhone收看电影、电视节目,可通过[Wii],[Xbox360],[PS3]等设备连接TV。Netflix是全球领先的经营在线业务公司。它成功地把传统的影像租赁业务和现代化的市场营销手段、先进的IT网络技术结合起来,从而开创了在线影像租赁的新局面。Netflix通过整合其自身的营销手段和最近的IT网络技术,成功地改变了消费习惯和打造了自己的品牌优势。

Netflix在2011年开始探索自制内容的举动并不被看好。直到2013年,其首部自制剧《纸牌屋》取得爆红后,舆论冲击及股票下滑的趋势才得以扭转。这也让Netflix成功打响了平台自制内容的第一炮。

2019 年 7 月 4 日,网飞的原创剧《怪奇物语》第三季开播,一如往常地一口气放出 12 集,再次掀起话题热潮。取得这样的成功,网飞自然是高兴不已。7 月 8 日,这家通常并不爱自夸成绩的公司表示,有近 4100 万家庭在四天之内观看了《怪奇物语》最新季,超过 1800 万家庭已经把整 8 集全部刷完。如果需要对比数据的话,4 月份 HBO 发布的《权力的游戏》最终季首播集观看人数为 1740 万。

在持续推动创新技术更新的同时,Netflix确保始终如一的出色的用户体验绝非易事。如何才能确信更新系统的时候不会影响用户的使用?而且实际上如何得到更多的反馈,可以对系统进行不断地改进也是一个巨大的挑战。

最终,Netflix公司通过对设备的数据进行采集,使用来自设备的实时日志作为事件源,得到了大量的数据,通过实时的大数据了解和量化了用户设备,最终成功的近乎无缝地处理了视频的浏览和回放,完美的解决了这些问题。

下面我们来具体了解一下:

系统架构

如上图,整个系统架构通过对用户设备日志收集,通过kafka的消息传递,最终存储在Druid中。

一旦有了这些数据,就将它们存入数据库,这里使用的是实时分析数据库Druid。

每项数据流均标有关于所用设备类型的匿名详细信息,例如,该设备是智能电视,iPad还是Android手机。这使得能够根据各个方面对设备进行分类并查看数据。反过来,这又使我们能够定向的分析仅影响特定人群的问题,例如应用程序的版本,特定类型的设备或特定国家/地区。

可通过仪表板或临时查询立即使用此聚合数据进行查询。还可以连续检查指标是否有警报信号,例如新版本是否正在影响某些用户或设备的播放或浏览。这些检查用于警告负责的团队,他们可以尽快解决该问题。

在软件更新期间,为部分用户启用新版本,并使用这些实时指标来比较新版本与以前版本的性能。指标中的任何问题都会使我们立刻发现并中止更新,并将那些使新版本直接恢复到先前版本。

由于每秒需要处理超过200万个事件,因此将其放入可以快速查询的数据库是一个非常艰巨的任务。我们需要一个拥有足够的性能与多维度查询的数据库,来处理每天产生超过1,150亿行的数据。在Netflix,最终选择利用Apache Druid来应对这一挑战。

Druid(德鲁伊)

Druid是一个分布式的支持实时分析的数据存储系统。通俗一点:高性能实时分析数据库。

Apache Druid是一个高性能的实时分析数据库。它是为需要快速查询和提取的工作流而设计的。德鲁伊在即时数据可视性,即席查询,运营分析和处理高并发方面表现出色。” — druid.io

因此,Druid非常适合现在我们面临的这种用例。事件数据的摄取频率非常高,具有大数据量和快速查询要求。

Druid不是关系数据库,但是某些概念是可移植的。我们有数据源,而不是表。与关系数据库一样,这些是表示为列的数据的逻辑分组。Druid的Join性能目前还不是很优秀。因此,我们需要确保每个数据源中都包含我们要过滤或分组依据的任何列。

数据源中主要有三类列-时间,维度和指标。

德鲁伊中的一切都取决于时间。每个数据源都有一个timestamp列,它是主要的分区机制。维度是可用于过滤,查询或分组依据的值。指标是可以汇总的值,几乎总是数字。

我们假设数据由时间戳作为键,Druid可以对存储,分配和查询数据的方式进行一些优化,从而使我们能够将数据源扩展到数万亿行,并且仍然可以实现查询响应时间在十毫秒内。

为了达到这种级别的可伸缩性,Druid将存储的数据划分为多个时间块。时间块的持续时间是可配置的。可以根据您的数据和用例选择适当的持续时间。对于我们的数据和用例,我们使用1小时时间块。时间块内的数据存储在一个或多个段中。每个段都保存有所有数据行,这些行均落在其时间戳键列所确定的时间块内。可以配置段的大小,以使行数或段文件的总大小有上限。

查询数据时,Druid将查询发送到集群中所有包含查询范围内时间块的分段的节点。每个节点在将中间结果发送回查询代理节点之前,都会对所保存的数据进行并行处理。代理将执行最终合并和聚合,然后再将结果集发送回客户端。

摄取数据

把数据实时插入到此数据库。这些事件(在本例中为指标)不是从单个记录插入到数据源中,而是从Kafka流中读取。每个数据源使用1个主题。在Druid中,我们使用Kafka索引编制任务,该任务创建了多个在实时节点中间管理者之间分布的索引编制工作器。

这些索引器中的每一个都订阅该主题,并从流中读取其事件共享。索引器根据摄入规范从事件消息中提取值,并将创建的行累积在内存中。一旦创建了行,就可以对其进行查询。到达索引器仍在填充一个段的时间块的查询将由索引器本身提供。由于索引编制任务实际上执行两项工作,即摄取和现场查询,因此及时将数据发送到“历史节点”以更优化的方式将查询工作分担给历史节点非常重要。

Druid可以在提取数据时对其进行汇总,以最大程度地减少需要存储的原始数据量。汇总是一种汇总或预聚合的形式。在某些情况下,汇总数据可以极大地减少需要存储的数据大小,从而有可能将行数减少几个数量级。但是,减少存储量确实要付出一定的代价:我们失去了查询单个事件的能力,只能以预定义的查询粒度进行查询。对于我们的用例,我们选择了1分钟的查询粒度。

在提取期间,如果任何行具有相同的维度,并且它们的时间戳在同一分钟内(我们的查询粒度),则这些行将被汇总。这意味着通过将所有度量值加在一起并增加一个计数器来合并行,因此我们知道有多少事件促成了该行的值。这种汇总形式可以显着减少数据库中的行数,从而加快查询速度,因为这样我们就可以减少要操作和聚合的行。

一旦累积的行数达到某个阈值,或者该段已打开太长时间,则将这些行写入段文件中并卸载到深度存储中。然后,索引器通知协调器段已准备好,以便协调器可以告诉一个或多个历史节点加载该段。一旦将段成功加载到“历史”节点中,就可以从索引器中将其卸载,并且历史记录节点现在将为所有针对该数据的查询提供服务。

数据管理

就像您想象的那样,随着维数基数的增加,在同一分钟内发生相同事件的可能性降低。管理基数以及因此汇总,是获得良好查询性能的有力手段。

为了达到所需的摄取速率,我们运行了许多索引器实例。即使在索引任务中合并了相同行的汇总,在相同的索引任务实例中获得所有相同行的机会也非常低。为了解决这个问题并实现最佳的汇总,我们安排了一个任务,在将给定时间块的所有段都移交给历史节点之后运行。

计划的压缩任务从深度存储中获取所有分段以进行时间块化,并执行映射/缩小作业以重新创建分段并实现完美的汇总。然后,由“历史记录”节点加载并发布新的细分,以替换并取代原始的,较少汇总的细分。在我们的案例中,通过使用此额外的压缩任务,我们发现行数提高了2倍。

知道何时收到给定时间块的所有事件并不是一件容易的事。可能有关于Kafka主题的迟到数据,或者索引器可能会花一些时间将这些片段移交给“历史”节点。为了解决此问题,我们在运行压缩之前强加了一些限制并执行检查。

首先,我们丢弃任何非常迟到的数据。我们认为这太旧了,无法在我们的实时系统中使用。这样就可以确定数据的延迟时间。其次,压缩任务是有延迟地安排的,这给了段足够的时间以正常流程分流到历史节点。最后,当给定时间块的计划压缩任务开始时,它查询段元数据以检查是否还有任何相关段仍在写入或移交。如果有,它将等待几分钟后重试。这样可以确保所有数据都由压缩作业处理。

如果没有这些措施,我们发现有时会丢失数据。开始压缩时仍要写入的段将被具有更高版本的新压缩的段覆盖,因此具有优先权。这有效地删除了尚未完成移交的那些段中包含的数据。

查询方式

Druid支持两种查询语言:Druid SQL和原生查询。在后台,Druid SQL查询被转换为本地查询。原生查询作为JSON提交到REST端点,这是我们使用的主要机制。

对集群的大多数查询都是由自定义内部工具(例如仪表板和警报系统)生成的。这些系统最初旨在与我们内部开发的开源时间序列数据库Atlas一起使用。因此,这些工具使用Atlas Stack查询语言。

为了加快采用Druid查询的速度并实现对现有工具的重用,我们添加了一个转换层,该层接受Atlas查询,将其重写为Druid查询,发布查询并将结果重新格式化为Atlas结果。这个抽象层使现有工具可以按原样使用,并且不会为用户访问我们的Druid数据存储中的数据创建任何额外的学习曲线。

调整

在调整群集节点的配置时,我们以很高的速度运行了一系列可重复和可预测的查询,以便获得每个给定配置的响应时间和查询吞吐量的基准。这些查询旨在隔离集群的各个部分,以检查查询性能是否有所改善或降低。

例如,我们针对最新数据运行了有针对性的查询。同样,对于更长的持续时间,但只有较旧的数据可以确保我们仅查询“历史”节点以测试缓存配置。再次使用按非常高的基数维度分组的查询,以检查结果合并是如何受到影响的。我们继续调整并运行这些基准测试,直到对查询性能感到满意为止。

在这些测试中,我们发现调整缓冲区的大小,线程数,查询队列长度和分配给查询缓存的内存对查询性能产生了有效影响。但是,引入压缩工作将占用我们汇总不良的细分,并以完美汇总将它们重新压缩,这对查询性能产生了更大的影响。

我们还发现,在历史节点上启用缓存非常有好处,而在代理节点上启用缓存则没有那么多。太多了,我们不使用代理上的缓存。这可能是由于我们的用例所致,但是我们几乎进行的每个查询都未命中代理上的缓存,这可能是因为查询通常包含最新数据,因为这些数据始终会到达,因此不会包含在任何缓存中。

摘要

经过多次迭代,针对我们的用例和数据速率进行了调整和定制,德鲁伊已被证明具有我们最初希望的能力。

我们已经能够使用功能强大且可用的系统,但是还有更多工作要做。我们的摄入量和摄入率不断提高,查询的数量和复杂性也在不断增加。随着更多团队实现这些详细数据的价值,我们经常添加更多指标和维度,从而推动系统更加努力地工作。我们必须继续监视和调整,以保持查询性能。

目前,我们每秒接收超过200万个事件,并查询超过1.5万亿行,以深入了解我们的用户如何体验该服务。所有这些都有助于我们保持高质量的Netflix体验,同时实现不断的创新。

实时流式计算与流媒体的碰撞才刚刚开始,而Druid作为一款极易上手的高性能实时查询数据库,也会得到越来越多的广泛使用。

0 人点赞