一个用于实时分析的开源数据存储
摘要
Druid是专用于基于大数据集的实时探索分析的开源数据存储。该系统包括列式存储,分布式的无共享架构,高级索引结构,可用于任意探索具有次秒级延迟的十亿行级的数据表。这篇文章我们主要描述Druid的架构,并且详细说明它如何支持快速聚合、灵活筛选以及低延迟数据的加载。
1. 引言
近年来,互联网技术的飞速发展引发了机器生成事件(machine-generated events)的快速增加。分开来看,这些事件包含的有用信息较少且价值不高。鉴于提取大量事件集合意义所需的时间和资源,许多公司宁愿放弃这些数据。虽然基础架构已被构建用于处理基于事件的数据,但它们很大部分以高价位销售,并且只针对那些能够负担得起的公司。
几年前,Google推出了MapReduce作为其利用商业硬件来进行网络索引和日志分析的机制。不久之后Hadoop项目就遵循并在很大程度上效仿MapReduce原始文档中的见解。Hadoop目前用于存储和分析大量的日志数据而存在于许多组织中。Handoop在帮助许多公司将低价值的事件流转化为高价值的应用程序贡献良多,例如商业智能和A-B testing。
与许多伟大的系统一样,Hadoop开启了我们对一个新的空间的问题。具体来说,Hadoop在存储和提供对大量数据的访问方面表现优异,然而,它并没有提供关于数据访问速度的性能保证。此外,尽管Hadoop是高可用性系统,但在高并发负载下性能下降。最后,虽然Hadoop在存储数据方面表现良好,但它并未针对提取数据和使数据立即可读而进行优化。
在开发metamarkets产品的早期,我们遇到了这些问题,并意识到Hadoop是一个很好的后台后台处理、批处理和数据仓库系统。然而,作为一家在高并发环境( 1000个以上用户)中,要求具有查询性能和数据可用性的产品级保证的公司,Hadoop不能满足我们的需求。我们探索了不同的解决方案,在尝试了关系型数据库管理系统和 NoSQL架构之后,我们得出的结论是,开放源世界中没有可以用来充分满足我们的需求的方法。我们最终创建了Druid,一个开放源,分布式,面向列编程的,实时分析数据存储。在许多方面,Druid与其它OLAP系统,交互式查询系统,内存数据库以及广为人知的分布式数据存储在许多方面具有相似点。分布和查询模型还借鉴了当代搜索基础架构的见解。
本文介绍了Druid的架构,探讨了创建一个永远在线的生产系统,为托管服务提供支持的各种设计决策,并尝试帮助任何面临类似问题的人解决潜在的解决方法。Druid已经在几家技术公司投产使用。本文的结构如下:我们首先描述第2节中的问题。接下来,我们从第3节中数据如何流经系统的角度详细介绍系统架构。然后讨论如何以及为什么数据被转换为第4节中的二进制格式。我们在第5节简要描述了查询API,并在第6节介绍了性能结果。最后,我们在第7节中关于运行Druid的教训,以及第8节中的相关工作。
2. 问题定义
Druid最初旨在解决关于摄取和探索大量事务事件(日志数据)的问题。这种形式的时间序列数据通常在OLAP工作流中发现,并且数据的性质往往非常重。
例如表1中的数据。表1给出了维基百科上发生的编辑数据。每次用户在维基百科中编辑页面时,都会生成包含有关编辑的元数据的事件。此元数据由3个不同的组件组成。首先,有一个时间戳列指示编辑的时间。接下来,存在指示关于编辑的各种属性的设置维度列,例如编辑的页面,进行编辑的用户和用户的位置。最后,有一组度量列包含可以聚合的值(通常为数字),例如在编辑中添加或删除的字符数。
我们的目标是快速计算这些数据的下钻和聚合。我们想回答的问题,如“在旧金山的男性贾斯汀·比伯的页面上做了多少修改?”和“在一个月的时间内卡尔加里的人添加的字符的平均数是多少?” 我们还希望任何任意维度组合的查询返回是亚秒级延迟。
Druid出现的动力,是因为当前开源的关系型数据库RDBMS和NoSql的key/value存储都不能为交互式应用提供低延迟数据摄取和查询平台。在Metamarkets初期,我们专注于建立一个托管的仪表板,允许用户对事件流进行任意的探索和可视化。为仪表板提供支持的数据存储需要足够快地返回查询,以便在其上构建的数据可视化可以为用户提供交互式体验。
除了查询延迟需求之外,系统必须是多租户的并且高度可用。Metamarkets产品用于高度并发环境。停机时间是昂贵的,并且如果在面对软件升级或网络故障时系统不可用,许多企业不能等待。而停机时间,对于经常缺乏适当内部运营管理的初创公司,可以直接决定业务成功或失败。
最后,Metamarkets早期面临的另一个挑战是允许用户和警报系统能够“实时”做出业务决策。从创建事件到该事件可查询的时间决定了感兴趣方能够对其系统中潜在的灾难性情况作出反应的速度。流行的开源数据仓库系统(如Hadoop)无法提供我们所需的次秒级数据提取延迟。
数据探索,摄取和可用性的问题跨越多个行业。自从2012年10月Druid开源以来,它被部署为多个公司的视频,网络监控,运营监控和在线广告分析平台。
3. 架构
Druid集群由不同类型的节点组成,每个节点类型被设计为执行一组特定的事情。我们相信这种设计功能分离并简化了整个系统的复杂性。不同的节点类型相互独立地操作,并且使它们之间的交互最小化。因此,集群内通信故障对数据可用性的影响也最小。
为了解决复杂的数据分析问题,不同的节点类型汇聚在一起形成一个完全工作的系统。Druid的名字来自许多角色扮演游戏中的角色德鲁伊:它是一个能够变身的人,能够采取许多不同的形式,以履行在一个组中的各种不同的角色。Druid集群中的数据的组成和数据流向如图1所示。
3.1 Real-time Node
实时节点封装了事件流的摄取和查询功能。通过这些节点索引的事件可立即用于查询。节点仅关心一些小时间范围的事件,并且周期性地将它们在这个小时间范围上收集的不可变批量的事件移交给专门处理不可变事件批处理的Druid集群中的其他节点。实时节点利用Zookeeper与Druid群集的其余部分进行协调。节点向Zookeeper服务宣布他们的在线状态和数据。
实时节点为所有传入事件维护一个内存索引缓冲区。这些索引随着事件被摄取而递增地填充,并且索引也是可直接查询的。查询存在于此基于JVM堆的缓冲区中的事件时,Druid更像一个行式存储。为了避免堆溢出问题,实时节点会定期或在达到最大行限制后将其内存索引保留到磁盘。这个持久进程将存储在内存中缓冲区中的数据转换为第4节中描述的面向列的存储格式。每个持久化索引是不可变的,实时节点将持久索引加载到堆外存储器中,以便仍然可以查询它们。该过程在[33]中详细描述并且在图2中示出。
在定期的基础上,每个实时节点将调度一个后台任务,来搜索所有本地持久化索引。任务将这些索引合并在一起,并构建一个不可变的数据块,其中包含实时节点在一段时间内摄取的所有事件。我们将这个数据块为“segment”。在切换阶段期间,实时节点将该segment上传到永久备份存储,通常是诸如S3 [12]或HDFS [36]的分布式文件系统,Druid称之为“深存储”。ingest、persist、merge和handoff 步骤是流动的,在任何过程中没有数据丢失。
图3展示了实时节点的操作。节点从13:37启动,并且只接受当前小时或下一小时的事件。当事件被摄取时,节点宣布它正在服务从13:00到14:00的间隔的数据段。每隔10分钟(持续时间是可配置的),节点将刷新并保持其内存缓冲区到磁盘。接近小时结束时,节点可能在14:00至15:00看到事件。发生这种情况时,节点准备为下一小时提供数据,并创建一个新的内存索引。然后,该节点宣布它也在从14:00到15:00服务段。该节点不会立即合并13:00到14:00的持久化索引,而是等待13:00到14:00期间事件的可配置的窗口到达。此窗口时间段使事件传递延迟导致的数据丢失风险最小化。在窗口期结束时,该节点将所有持续索引从13:00到14:00合并成单个不可变段,并将该段handoff 。一旦这个段在Druid集群中的其他地方被加载和查询,实时节点将它收集的13:00到14:00的数据的所有信息清除,并且停止宣布它对这些数据的服务。
3.1.1 可用性和可扩展性
实时节点是数据的消费者,并且需要相应的生产者来提供数据流。通常,为了数据持久性的目的,会如图4所示,在生产者和实时节点之间采用如kafka[21]的消息总线。实时节点通过从消息总线读取事件来摄取数据。从事件创建到事件消费的时间通常在几百毫秒的量级。
图4中的消息总线的目的是双重的。首先,消息总线充当传入事件的缓冲区。诸如Kafka的消息总线保持指示消费者(实时节点)在事件流中读取了多远的位置偏移(offset)。消费者可以以编程方式更新这些偏移量。实时节点每次将它们的内存缓冲区持久保存到磁盘时都会更新此偏移量。在故障恢复方案中,如果节点上磁盘没有损坏,它可以从磁盘重新加载所有持久索引,并从其提交的最后一个偏移继续读取事件。从最近提交的偏移中获取事件大大减少了节点的恢复时间。在实践中,我们看到节点在几秒钟内从这种故障情况中恢复。
消息总线的第二个目的是充当单个端点(endpoint),使多个实时节点可以从该端点读取事件。多个实时节点可以从总线获取相同的一组事件,从而创建事件的复制。在节点完全失败并磁盘数据丢失的情况下,复制流确保没有数据丢失。单一数据摄取端点还允许对数据流进行分割,使得多个实时节点各自摄取流的一部分。这允许无缝地添加附加的实时节点。在实践中,这种模型允许超大型生产Druid集群能够以大约500 MB/s(150,000个事件/秒或2TB/小时)的速度消耗原始数据。
3.2 Historical Nodes
历史节点封装了用于加载和服务由实时节点创建的不可变的数据块(segments)的功能。在许多现实的工作流中,在Druid集群中加载的大多数数据是不可变的,因此,历史节点通常是Druid集群的主要工作者。历史节点遵循无共享架构,并且在节点之间没有单点竞争。节点不具有知识共享并且在操作简单,它们只知道如何加载,删除和服务不可变段。
与实时节点类似,历史节点向Zookeeper告知其在线状态及提供的数据。加载和删除段的指令也通过Zookeeper发送,并包含关于段在深存储中的位置以及如何解压缩和处理段的信息。在历史节点从深存储下载特定段之前,首先检查本地缓存,该缓存维护关于节点上已存在的段的信息。如果关于段的信息不存在于高速缓存中,则历史节点将继续从深存储下载段。此过程如图5所示。一旦处理完成,段会在Zookeeper中通知,此时,该段是可查询的。本地高速缓存还允许历史节点快速更新和重新启动。在启动时,节点检查其缓存,并立即提供它所定义的任何数据。
历史节点可以支持读一致性,因为它们只处理不可变的数据。不变数据块还支持简单的并行化模型:历史节点可以同时进行扫描和聚合不可变块而不用阻塞。
3.2.1 层
历史节点可以分组在不同的层中,其中给定层中的所有节点被相同地配置。可以为每个层设置不同的性能和容错参数。分层节点的目的是使得更高或更低优先级的段能够根据它们的重要性来分布。例如,可以旋转(spin up)具有大量核和大存储容量的历史节点的“热”层。“热”集群可以配置为下载更频繁访问的数据。也可以使用不太强大的硬件资源来创建并行“冷”集群。“冷”集群将仅包含较不频繁访问的段。
3.2.2 可用性
历史节点依赖于Zookeeper的段加载和卸载指令。如果Zookeeper变得不可用,则历史节点不再能够服务新数据或丢弃过时的数据,然而,因为查询是通过HTTP提供的,历史节点仍然能够响应他们当前服务的数据的查询请求。这意味着Zookeeper中断不会影响历史节点上的当前数据可用性
3.3 Broker Nodes
Broker节点充当历史节点和实时节点的查询路由器。Broker节点能够理解Zookeeper中发布的元数据,知道哪些段可以查询以及这些段所在位置的。Broker节点路由进入的查询,使得查询能够命中正确的历史或实时节点。Broker节点还合并历史和实时节点的部分结果,然后将最终合并结果返回给调用者。
3.3.1 缓存
Broker节点包含具有LRU(最近最少使用)无效策略的高速缓存。缓存可以使用本地堆内存或外部分布式key/value存储,如Memcached。每次Broker节点接收到查询时,它首先将查询映射到一组segments。某些段的结果可能已经存在于缓存中,并且不需要重新计算它们。对于缓存中不存在的任何结果,代理节点将将查询转发到正确的历史和实时节点。一旦历史节点返回其结果,Broker节点将基于每个段来缓存这些结果以供将来使用。该过程如图6所示。实时数据从不被缓存,因此对实时数据的请求将总是被转发到实时节点。实时数据永远改变,缓存结果是不可靠的。 缓存还充当附加级别的数据持久化。在所有历史节点失败的情况下,如果这些结果已经存在于高速缓存中,则仍然可以查询结果。
3.3.2 可用性
如果和Zookeeper通信中断,数据仍然是可查询的。如果broker 节点无法与Zookeeper通信,他们使用他们的最后一个已知的集群视图,继续转发查询到实时和历史节点。broker节点假定集群的结构与中断之前的结构相同。实际上,这种可用性模型允许我们的Druid集群在我们诊断为Zookeeper中断时继续服务查询一段相当长的时间。
3.4 Coordinator Nodes
Druid coordinator 节点主要负责历史节点上的数据管理和分发。coordinator 节点告诉历史节点加载新数据,删除过期数据,复制数据,并将数据移动到负载平衡。Druid使用多版本并发控制交换协议来管理不可变段,以保持稳定的视图。如果任何不可变段包含完全由较新段覆盖的数据,则过时段将从集群中删除。coordinator节点需要经历leader选择过程,来确定运行协调器功能的单个节点为主,剩余的协调器节点充当冗余备份。
coordinator节点周期性地运行以确定集群的当前状态。它通过将群集的预期状态与群集在运行时的实际状态进行比较来做出决策。与所有Druid节点一样,coordinator节点通过Zookeeper连接来维护当前集群信息。coordinator节点还维护与包含其他操作参数和配置的MySQL数据库的连接。MySQL中的关键信息之一是包含了历史节点提供的所有段的列表的表。此表可以由创建段的任何服务(例如,实时节点)更新。MySQL数据库还包含一个规则表,用于管理在集群中的segments如何创建,销毁和复制。
3.4.1 规则
规则决定了如何从集群加载和删除历史段。规则指示应如何将段分配给不同的历史节点层,以及在每个层中应存在段的多少个复制。规则还可以决定何时应该完全从群集中删除段。规则通常设置为一段时间。例如,用户可以使用规则将最近一个月的段加载到“热”集群中,将最近一年的段加载到“冷”集群中,并且删除比较老的段。
coordinator 节点从MySQL的规则表中装入一组规则。规则可以针对数据源进行特定的配置,和/或可以配置默认规则集。coordinator 节点将循环遍历所有可用的段,并将每个段与适用于它的第一个规则匹配。
3.4.2 负载均衡
在典型的生产环境中,查询经常碰到几十个甚至几百个段。由于每个历史节点具有有限的资源,因此coordinator必须在分布在群集各节点之间,以确保群集负载不会太不平衡。确定最佳负载分布需要一些关于查询模式和速度的知识。通常,查询会覆盖单个数据源的连续时间间隔的最近的segments。平均来说,访问较小段的查询速度更快。
这些查询模式建议高速率的复制最近的历史段,扩展不同历史节点上时间上接近大的segments,以及共同定位不同数据源的段。为了在群集中最优地分布和平衡segments,我们开发了一个基于成本的优化过程,其考虑了segment的数据源、新近度和大小。算法的确切细节已经超出了本文的范围,可能在未来的文献中会进行讨论。
3.4.3 复制
coordinator节点可以告诉不同的历史节点加载相同segment的副本。历史节点集群的每个层中的副本数是完全可配置的。需要高级别容错的设置可以配置为具有大量的副本。segment副本的处理方式与原件相同,并遵循相同的负载分布算法。通过复制segment,单个历史节点故障在Druid集群中是透明的。我们使用此属性进行软件升级。我们可以无缝地使历史节点下线,更新它,将其备份,并对集群中的每个历史节点重复该过程。在过去两年中,我们从未在我们的Druid集群中进行软件升级的停机。
3.4.4 可用性
Druid协调器节点有Zookeeper和MySQL作为外部依赖。coordinator 节点依靠Zookeeper来确定集群中已经存在哪些历史节点。如果Zookeeper不可用,coordinator将不再能够发送指令来分配,平衡和丢弃segments。但是,这些操作不会影响数据可用性。 集群中存在哪些段的segment元数据信息,还是哪些段应该存在集群中的segment元数据信息
MySQL和Zookeeper故障响应的设计原则是相同的:如果负责协调的外部依赖失败,集群维持现状。Druid使用MySQL存储操作管理信息,关于集群中应存在哪些段的元数据信息。如果MySQL停止,此信息对协调器节点不可用。但是,这并不意味着数据本身不可用。如果协调器节点不能与MySQL通信,它们将停止分配新的段并丢弃过时的节点。在MySQL中断期间,Broker、历史和实时节点仍然可以查询。
4. 存储结构
Druid中的数据表(称为数据源)是时间戳事件的集合,并分割为一组segments,其中每个段通常为5-10万行。正式地,我们将段定义为跨越某个时间段的数据行的集合。段表示Druid中的基本存储单元,复制和分发都是在段级别完成的。
Druid总是需要一个时间戳列,用来简化数据分发策略,数据保留策略和第一级查询修剪。Druid数据源划分成定义良好的时间间隔(通常为一小时或一天),并且可以进一步对来自其他列的值进行分区,以实现所需的段大小。分割段的时间粒度是数据量和时间范围的函数。如果数据集中的时间戳遍布在一年里,则按天进行分区。如果数据集中的时间戳遍布在一天里,则按小时进行分区。
段由数据源标识符进行唯一标识,标识符包括数据的时间间隔以及新段被创建时增加的版本字符串。版本字符串可以识别出段数据的新鲜度;新版本的段具有较新的数据视图(在一些时间范围内)。该段元数据由系统用于并发控制; 读操作总是从具有该时间范围的最新版本标识符的段中访问特定时间范围内的数据。
Druid段用列式存储。鉴于Druid最适合用于事件流的聚合计算(所有进入Druid的数据必须有一个时间戳),所以将聚合信息存储为列而不是行的优势已有详细记录[1]。列存储允许更高效的利用CPU,因为只有实际需要的才会被加载和扫描。在面向行的数据存储器系统中,与行相关联的所有列必须作为聚合的一部分进行扫描。额外的扫描时间可以引入明显的性能退化[1]。
Druid有多种列类型来表示各种数据格式。根据列类型不同,使用不同的压缩方法来降低在内存和磁盘上存储列的成本。在表1中给出的示例中,page、user、gender和city列仅包含字符串。直接存储字符串需要不必要的代价,可以用字典编码来代替。字典编码是压缩数据的常用方法,并已用于其他数据存储系统,如PowerDrill [17]。在表1的示例中,我们可以将每个page映射到唯一的整数标识符。
Justin Bieber -> 0 Ke$ha -> 1 此映射允许我们将page列表示为整数数组,其中数组索引对应于原始数据集的行。对于page列,我们可以表示为:[0, 0, 1, 1]
生成的整数数组本身非常适合进行压缩。在编码之上的通用压缩算法在列存储中非常常见。Druid使用LZF [24]压缩算法。
类似的压缩方法可以应用于数字列。例如,表1中characters added和characters removed列也可以表示为单个数组: Characters Added -> [1800, 2912, 1953, 3194] Characters Removed -> [25, 42, 17, 170]
在这种情况下,我们压缩原始值,而不是它们的字典表示。
4.1 数据过滤索引
在许多现实世界的OLAP工作流中,针对满足某些维度条件的某些度量集合的聚合结果发出查询。比如一个查询示例是:“旧金山的男性用户进行了多少次维基百科编辑?”此查询基于维度值的布尔表达式(city=='San Francisco' and gender='Male')过滤表1中的维基百科数据集。在许多实际数据集中,维度列包含字符串,度量列包含数值。Druid为字符串列创建额外的查找索引,以便只扫描属于特定查询过滤器的那些行。
让我们考虑表1中的page列。对于表1中的每个唯一页面,可以使用一些标记来指明哪些行可以看到特定页面。我们可以将此信息存储在二进制数组中,其中数组索引表示我们的行。如果在特定行中看到特定页面,则该数组索引被标记为1.例如 Justin Bieber -> rows [0, 1] -> [1][1][0][0] Ke$ha -> rows [2, 3] -> [0][0][1][1]
Justin Bieber在行0和1中可以看到。列值到行索引的映射形成了一个倒排索引[39]。要知道哪些行包含Justin Bieber或Ke$ha,我们可以对这两个数组进行OR运算。 [0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
这种对大型bitmap数据集执行布尔运算的方法通常用于搜索引擎中。OLAP负载的位图索引在[32]中有详细描述。位图压缩算法是一个明确的研究领域[2,44,42],并且经常利用游标编码算法。Druid选择使用Concise算法[10]。图7说明了使用整数数组进行Concise压缩的字节数。结果是在一个cc2.8xlarge系统生成的,其中使用了单线程,2G堆,512m年轻带,和每个运行之间的强制GC。数据集是从Twitter garden hose[41]数据流收集的一天的数据。数据集包含2,272,295行和12个不同基数的维度。作为一个额外的比较,我们也对数据集行排序以做到最大化压缩。
在未排序的情况下,Concise压缩后大小为53,451,144字节,总整数数组大小为127,248,520字节。总的来说,Concise压缩集比整数数组小42%。在排序的情况下,总Concise压缩大小为43,832,884字节,总的整数数组大小为127,248,520字节。有趣的是,在排序后,全局压缩只增加了很小一部分。
4.2 存储引擎
Druid的持久化组件允许不同的存储引擎以插件的方式接入,类似于Dynamo。这些存储引擎可以将数据存储在一个完全的in-memory结构的引擎中,例如JVM heap,或者是存储于 memory-mapped 结构的存储中。Druid中存储引擎可配置更换的这个能力依赖于一个特定的应用规范。一个in-memory的存储引擎要比memory-mapped存储引擎的成本昂贵得多,但是如果对于性能特别敏感的话,in-memory存储引擎则是更好的选择。默认情况下使用的是memory-mapped存储引擎。 当使用一个memory-mapped存储引擎的时候,Druid依赖于操作系统来对segment在内存中进行换入和换出操作。因为只有当segment加载到内存中了才可以被查询,所以memory-mapped存储引擎允许将最近的segment保留在内存中,而那些不会再被查询的segment则被换出。使用memory-mapped的主要缺点是当一个查询需要更多的segment并且已经超出了节点的内存容量时,在这种情况下,查询性能将会因为不断的在在内存中进行segment的换入和换出而下降。