Apache Hudi是一个开源数据湖管理平台,用于简化增量数据处理和数据管道开发,该平台可以有效地管理业务需求,例如数据生命周期,并提高数据质量。Hudi的一些常见用例是记录级的插入、更新和删除、简化文件管理和近乎实时的数据访问以及简化的CDC数据管道开发。
本期SOFTWARE DAILY有幸采访到了Apache Hudi项目VP Vinoth Chandar。Vinoth是Uber Hudi项目的创建者,他继续在Apache Software Foundation领导Hudi的发展。在此之前他曾是Confluent的首席工程师,之前是Uber的高级工程师/经理。本期我们将讨论构建大型分布式和数据系统。
Q1:今天我们就数据湖、数据仓库和数据基础设施进行一场引人入胜的讨论。数据湖可以低成本存储所有数据,然后使用该数据执行操作,由于价格便宜,可以保存所有数据。数据仓库是更昂贵的存储空间,它可能更接近内存,并且通常更昂贵,但访问速度更快。这是我如何看待这两个抽象的非常粗略的描述。我希望您对这两个抽象以及这些术语在过去几年中的演变有何看法?
VC:我认为您对它们的描述几乎与几年前一样,如果稍微倒退一点回到Teradata、Vertica时代,存储与计算耦合的数据仓库,他们几乎都是MPP数据库,然后到Hadoop时代,带来了水平可扩展的计算能力,然后数据湖和仓库共存。但是最后我想说大约五年来,数据湖和数据仓库生态系统都发生了巨大的变化。具体地说,云数仓现在是黄金时间,它们与以前的仓库有完全不同的体系结构,它们使存储和计算分离,然后可以使用云存储来水平扩展,这样它们听起来就像是数据湖。而如果使用数据湖,那么会有事务性管理数据的需求,或者具有变更和更新存储在数据湖中的数据的能力。摆脱了"好吧,让我们将其视为所有数据的廉价转储,转变成更有意识组织的,大量结构化数据流入数据湖",然后数据湖技术也开始变得越来越像数据库/数据仓库边界,从我看来那就是我们的方向。
Q2:您对不同的流行数据仓库(数据湖抽象)看法是什么?我看到的三个主要对象是Snowflake,BigQuery和带有Delta和Spark的Lakehouse架构。也许还会包括Redshift。也许您可以列举不同的流行数据仓库,数据湖抽象以及它们之间的对比。
VC:那么让我们从云数据仓库开始,实际上我会将Redshift放在前面,我会将Redshift,BigQuery和Snowflake视为云数仓。它们都有一些非常共同的特征,如都有很多类似数据库的参数。实际上它们具有的事务处理能力要远远高于您所看到的能力,正如我们在谈论数据湖抽象时所看到的,它们都具有一种内部专有格式,不是很开放,并且非常类似于垂直集成系统,包括SQL、文件格式、执行运行时。他们与BI系统紧密集成,就像常规分析一样,因此我以类似的方式来看待它们。然后进入数据湖,这是我从2016年左右开始涉足Apache Hudi的背景,包括Databricks的Delta Lake,Netflix的Apache iceberg,三个系统都提供了一个事务层,就像在S3或云对象存储之上管理文件一样,并且使用开放文件格式,如Parquet、ORC。这三个项目在内部设计以及解决问题的方式有一些相似之处和不同点。就我个人而言,当Lakehouse出现时,我并不感到惊讶,因为几年来我们已经在Uber投入生产类似的东西,我知道有几家大型科技公司已经在做类似的事情,其核心思想是:“让我们将数仓的原语带到数据湖,并试图在数据湖本身上做更多的事情" ,而且我认为Databricks做出了出色的工作,它是业界领先的Spark计算提供商之一,为这种架构模式增加了行业视野,他们在表达这种愿景方面也做得很好,这就是我的看法。
Q3:既然您提到Uber,您能给我更多有关Uber的数据仓库或Uber的数据基础架构的背景信息吗?
VC:我在2014年加入Uber,Uber业务增速非常快,这意味着随着我们推出更多城市,数据量也在不断增长。Uber是一家非常实时的公司,我们可以通过构建大量针对数据新鲜度进行优化的系统来节省大量资金。公司、城市运营商,工程师,每个人都可以访问数据。我们从Vertica开始,但是随着数据量的增长,我们意识到需要一个数据湖,我们使用Spark将所有初始数据转储到数据湖中,然后将原始数据从本地仓库中移出。但是当真正开始实施时,我们意识到在数据库和数据湖之间增加了额外一层,这导致上在它们之间增加了很多延迟,这主要是由于所有事情都是大批量完成的, Hadoop世界更喜欢大规模批量操作。这就是Hudi出现的背景,需要支持更新,删除。我们实际上可以获取数据库更改日志,这给我们带来了极大的查询数据新鲜度,而Vertica也为我们提供了良好的查询性能。我们通过在Hadoop文件系统抽象之上构建事务层或无服务器事务层来复制类似的东西,以便它可以与HDFS,S3一起使用,这是面向未来的。并且我们尝试在将操作数据提取到数据湖中的同时解决更新和删除问题,可以将批处理工作从大约12、16小时,24小时运行转变为在30分钟,15分钟,5分钟内完成,实际上可以根据我们的需求调整延迟,因为Hudi可以使用Apache Spark作为运行时,因此它可以非常好地水平扩缩容。
我们解决的第二个问题仅仅是解决更新和删除问题,但还不够,因为通常在数据湖体系中会拥有一组原始表,然后使用ETL作业从中构建更多派生表,但所有这些派生表都不了解实际更改了哪些数据。例如有一个简单的ETL作业(正在标准化货币换算或某些非常简单的原始操作),但必须对整个小费表表进行全表扫描,才能真正了解发生了什么变化,所以我们说:“好吧,流处理是如何解决这个问题的",这就是Hudi内置的两个基本特性。我们在支持更新,删除和增量更改流的同时也支持了事务,这就是Hudi诞生的方式,我们在2016年做到了这一点。到2016年底,Hudi已经稳定运行两年时间,为Uber的所有关键业务关键数据集提供了动力,已经存储了几PB的数据。我们在2017年开源了该项目,进入了Apache孵化器,2018年Apache孵化器中毕业。而且我们一直在与许多在其平台上采用Hudi的云提供商一起发展社区,以解决整个行业广泛存在的相同问题。
Q4:为何当时没有像现有的数据基础架构技术那样解决这些问题?如今这些现有的数据湖、数据仓库产品已经解决了这些问题吗?
VC:我们需要事务、更新和删除等功能,以便我们快速将数据从上游数据库中提取到仓库中。但是仓库不能容纳所有数据,您可以运行数十个节点的Arrows群集,但是我们的数据量巨大,以至于无法容纳在任何一个集群中,这是Arrow限制,我们无法进行扩展。
总的来说在Hadoop技术栈体系中,当时还没有成熟的系统能够摄取数据并真正很好地对其进行管理。Hadoop计划中的大部分工作都用于构建HDFS,Yarn,Hadoop Spark,Hive Spark,Presto等,实际数据管理或存储层并未引起太多关注,例如调整文件大小。用户可以扩展HDFS并通过写入适当大小的文件来保持HDFS健康,但没有库在整个生态系统中统一实现这一功能,大型公司都试图构建自己的解决方案,但在不同时间轴上,实际这是一个明显的问题,也是Hudi的诞生方式。如果拉回到今天,我会说云仓库在解决我说过的老式数据仓库中的数据规模问题方面做得很好,它们的存储位于S3上而不在本地设备上,它们确实解决了数据存储扩展问题。但是问题仍然在于它们是专有格式,因此如果今天再次进行操作,我仍会重新构建,因为我们不想将公司的所有原始数据锁定为一种专有格式,Databricks提出的Lakehouse引起了我的共鸣,因为它们不是通用的,在某种意义上说我们需要一流的公民来支持数据科学和机器学习。然后我们希望数据科学家对分析人员用于报告的相同数据建立模型和分析。如果数据在数据仓库和数据湖中同时存在,那么会遇到大量的数据质量问题。从体系结构上讲,我认为让数据更快进入由Apache Hudi之类的功能驱动的原始数据湖仍然有意义,这样对于您要执行的任何下游处理开销都很少。然后您选择要使用哪种工具整理数据(如果需要)以进行分析。还是有很多其他实时分析工具,比如Druid、Clickhouse等。
Q5:Hudi是一个开源项目,这意味着它广泛适用。这不仅适用于不同规模的公司。为什么这是一个广泛适用的问题?
VC:这是一个非常非常好的问题。当我们真正开始创建Hudi时,甚至是在我自己追溯该问题时,我都非常确信这就是我们必须为Uber构建它的方式。人们通常应该采用这种方式,但是并不是那种强制功能可以使人们以某种方式采用这种功能,如果您查看其中的一些技术,Hudi自2016年左右就已经存在,实际上它带给我们的是GDPR之类的能力,还有诸如强制功能之类的服务,转储到S3或其他存储上的所有数据,您都需要对其进行管理,需要删除内容,需要纠正或掩盖其中的内容,这个场景适用于任何跨国公司,然后这也引起了人们对数据湖的大量关注,这就是我们感到Hudi非常适用的地方。以事务方式更新数据,然后像流数据湖模式(如我所说的那样)进行摄取的技术正在慢慢流行起来,人们意识到在数据隐私法律中需要适当地管理用户数据,那么什么是正确的架构?看来我需要一个数据湖,现在有了这些工具,我们在该行业上是正确的,而且我认为未来几年我们将适应各种模式。
Q6:简单介绍一下您认为理想的数据体系结构。就像什么理想的用户体验可以消除大量的配置和繁琐的设置工作或维护工作?对于在数据平台之上工作的数据工程师,数据科学团队来说,什么是好的理想体验?
VC:这是另一个奇妙的问题,让我们从组织的角度来思考这个问题,假设有一家公司已经相当成功了,它拥有数百名员工。然后现在数据管理问题开始出现了,然后可以使用一些集成工具来进行基本的报告分析。但现在如果有两三个业务职能,一个风险团队,一个风险欺诈团队,并且有一个财务团队,还有一个产品团队,每个团队都需要聘请数据工程师,并且对用户某些操作中的数据感兴趣,数据在MySQL,Postgres、Oracle、RDBMS或NoSQL存储,然后会自然引入Kafka消息队列,像事件跟踪数据一样,几乎每个公司都使用类似的方法来跟踪许多正在进行的活动。因此大多数公司从本质上选择了一条途径,即从聘请数据工程师到各个业务职能部门开始,他们精心挑选所需的数据集,他们实际上并没有像完全集中的数据湖那样进行构建,因为在组织上通常很难为这种产品提供资金。随着时间的流逝,最终出现了数据孤岛。然后财务团队成员写的查询无法与欺诈团队中的某人核对数据,然后需要给财务团队中的某人(而不是欺诈团队)一个类似的、不同种类的生产数据访问控制,使得人们抱怨在使用数据湖的痛苦,我认为要解决的首要问题是在原始环境中将大量上游系统复制到数据湖中,这必须在没有太多转换的情况下进行,而且它必须非常快地完成操作,这样您就可以在工作之前不必等待数小时才能收到这些数据了,因此只要您能够像原始数据流一样构建它们称为的原始数据层,然后将其释放-并在其之上使用一些类似的工具和控件访问控制,那么它将使您的数据工程师专注于业务功能,如果他们想连接某些表以获得更好的数据质量也很容易做到,因为他们拥有所有可用的数据。如果您今天看一下Databricks,Databricks是一个Spark运行时,其提供了大量数据科学工具,而且如果您查看的是Starburst或Presto,HANA Starburst,Presto之类的查询引擎公司,它们确实非常适合–就像他们拥有良好的BI工具一样,实际上可以根据用例选择要使用的查询引擎,并且可以向数据科学团队提供数据库订阅,当财务团队运行报表时,就像仪表板一样。因此可以自由选择,并且可以实际控制哪些数据,回到我之前说的那样,此原始原始数据层几乎没有增加数据延迟,所有原始数据都非常快地流入数据湖,这就是在公司中进行的任何派生数据计算的起点。您可以随时从一个云仓库转移到另一个仓库,也可以像您喜欢的那样引入或淘汰旧的实时分析引擎。如果需要您将几乎可以重新计算任何东西,并且此模型具有很大的自由度,我认为这就是我应该朝着的方向发展。
Q7:鉴于您刚刚将其描述的未来,请描述下数据基础架构部署到该世界需要做些什么?还是我们需要在基础设施技术方面取得哪些进步才能实现这一未来?
VC:我认为很多技术已经存在,如果我们现在一步步走,首先我们需要做的事情就是真正捕获更改数据–这就是类似Debezium这样的项目做得很棒的地方,越来越多的公司从数据库提供CDC流,而且随着这种方式成为主流,采用更加标准化的工具来获取这些流并将其放入数据湖的表中,我认为这是我们真正需要的。有了Apache Hudi,我们已经朝这个方向迈出了一大步,这就像我们一直在构建Hudi一样,就像一个平台,而不是像事务层一样,或者只解决了这一更新问题,更多的工具和一条更好的途径来快速地提取和集成数据,我要说的第二部分是如果花一点时间来比较一下云数据仓库和数据湖,数据湖中的中央meta存储可能仍然是Hive Metastore,然后在最近几年,Hive Metastore有其自身的可扩展性问题,它无法跟踪文件级别或类似级别的详细统计信息,因此我觉得我们需要为了使人们能够以出色的性能查询此数据并希望提供出色的可用性,我们需要要么像Hive Metastore这样的显着改进,要么像Hudi这样的新型类似系统以及为开源查询引擎抽象的类似系统,这是我看到的一个技术差距。如果没有此功能,则您的Presto查询引擎可能真的非常非常好,但是如果没有所有统计数据输入,您将无法获得与像云数据仓库这样的完全垂直集成的系统一样的性能,所以这些都是我认为我们需要改进的地方。
我要说的第三点,实际上是Hudi目标的核心,作为一个项目我们要思考的要比我们做的要远得多,我们必须想一想如何从流处理中学习并让我们的批处理作业更多,如增量运行无需过多处理,因为任何时候您都会遇到围绕数据新鲜度或查询性能的类似瓶颈,这会导致就像我们刚刚讨论过的理想数据架构面临的风险和威胁一样。从那时起人们开始采用捷径,并且喜欢在其数据体系结构方面朝着不同的方向发展,我认为这是我们应该建立的三件事。
Q8:回到Apache Hudi,您可以更深入地介绍Hudi的体系结构吗?
VC:从高层次上讲对Hudi最好的描述是它是一个数据湖平台,而且它有很多平台组件,它们围绕某种数据库核心进行了构建,并针对流处理进行了优化,所以我喜欢将其分解,Hudi是一个库,没有长期运行的服务器,用户可以使用Spark或Flink向其中写入数据。Hudi将类似的数据组织在Apache Parquet或Apache Avro文件中,并且提供了很多元数据,还跟踪有关在云存储上对该逻辑数据集进行的写入和更改的大量元数据,然后所有查询引擎(例如Hive,Spark,Presto,Impala,Trino甚至Redshift)都可以直接查询在Hudi表中写入的数据。现在如果像Hudi OSI数据层那样分解Hudi,那么您就拥有了云存储,此外还有这些开放数据文件格式,Parque,ORC,Avro文件格式以及所有内容,Hudi定义了文件组织的布局,然后再提供并发控制和事务。然后它提供了一些功能来对数据建立索引,以便您可以进行快速更新删除,另外Hudi还有一些服务(守护程序)优化存储布局,并在用户高兴地只是将数据写入格式时在后台重新索引某些内容,压缩或在后台执行多项操作,Hudi有很多这样的服务,它们可以在写入过程中同步运行或者异步运行。它们可以像优化所有表的守护程序一样运行。对于更新操作,可以先增量更新到日志中,然后再压缩它们,因此有一个压缩服务,当然用户可以改变数据存储布局,并重新对数据进行聚类以获得更好的查询性能,因此Hudi有一个Clustering服务,然后还有个Clean服务清理和清除旧文件,所有这些服务彼此协调,这是Hudi的核心设计,而不是像其他系统那样,Hudi有大量的上层服务,就像有一个提取服务一样,它可以从Kafka中获取数据,将其转换为本质上是流,而不只是在S3上的Hudi表,它可以执行检查点管理,它可以自己进行恢复。同样我们拥有一堆不同的非结构化数据格式进行转化将其提取到Hudi表中;也可以编写流式的增量ETL管道,仅从上游Hudi表中使用变更流,可以获得自某个时间点以来已插入或更新的所有记录。Hudi也可处理重复数据。另外我们提供了一些工具,可以在数据写入Hudi表时对外提供通知,我们有很多这样的服务,这就是为什么我要说我们的原则不是要建立一个数据库核心,而是要建立一套工具和服务,使人们可以简单地使用它,然后解决实际问题。
Q9:如果系统可以从Hudi受益但没有使用Hudi,它们将面临哪些挑战呢?
VC:让我们来一个没有Hudi的数据湖。包括一个上游数据库,假设您有一个Cassandra集群或Dynamo,例如某些数据库,其中包含一些即将到来的变更流,您有一个Kafka集群,您的微服务通过该集群记录事件。现在您已经拥有了我们想要构建的数据湖体系结构。但是您想构建一组原始表,然后编写一些ETL并构建一种派生表,如果没有Hudi,人们通常会这样做,那就是他们会像Spark作业那样编写代码,或者使用Kafka Connect或Camel之类的框架或者只是继续编写某些内容–就像从Kafka提取一样,将这些事件写成类似Avro文件和行存,这就是您布置原始数据的方式。然后他们将在几个小时内批量导入数据库,或者可以从这些数据库中进行更改捕获,但是他们不知道如何应用它们,因此他们需要对整个表进行批量合并,这会进行数据库的大量提取,并且它们将像事件的增量式提取那样进行。而且如果他们想每5分钟或每1分钟提取一次Kafka数据,他们就必须做更多的事情来控制文件大小和所有内容,这导致原始层中数据库数据的数据新鲜度较差,并且产生有很多小文件,或者由于它们是基于行的格式,导致分析查询性能差。同样编写ETL的作业也将延迟,通常您使用Hive或Spark编写一堆ETL,然后构建一组派生数据表,这些导出的数据表还遭受不良的数据新鲜度的困扰,原始数据的查询效率也非常非常差,因为您必须应对原始数据格式,并且必须处理许多小文件,这些文件通常会降低查询性能。如果使用Hudi之类的工具,便可以使用Hudi的增量数据流工具,如果某个Kafka集群中有任何数据,则可以增量、连续摄取,同时可以直接使该表,这意味着即使是数据库数据,数据延迟也在几分钟之内。同时还可以使用Hudi自动调整小文件功能,以便下游ETL和查询执行性能更好,因为采用列存格式。
以Uber为例说明,如果每30分钟提取一次数据,将会写入10个文件,这10个文件中的大多数将包含所有城市的数据,因为这有点像数据到达的方式。但通过类似Hudi Clustering的服务可以重组数据,使属于给定城市的所有记录彼此靠近。这样一来查询实际上扫描的数据量将大大减少。可以做很多事情来减少查询成本,提高效率,还可以很好地改善数据的新鲜度,继续到派生的数据管道,Hudi还可以提供Hudi中每个表的变更流,这意味着可以采用与流处理中相同的概念。同样您可以像Flink或Spark作业那样将变更流连接到Hudi表,它也可以作为快照与另一个Hudi表关联查询。编写增量数据管道使得它们处理较少的数据量,这意味着成本较低,并提供了更好的数据新鲜度,这是我想当初在Uber进行的一件令我着迷的事情。通常您没有机会获得可以真正降低成本并且在构建数据库时也可以更快的机会,Hudi为您提供了一个框架,使您可以实际增量地摄取和增量地执行ETL,简而言之它将为您的数据湖做好准备。
Q10:那么对于Hudi来说,由Hudi构建的所有东西都被放入内存了吗,还是Hudi完全使用磁盘?
VC:当您查询Hudi表时,它与查询Hive表或Presto表没有什么不同,或像为Hive表一样,本质上这些湖引擎所做的就是Hudi所做的。Hudi就像查询层的形式一样,只是像它们查询的表抽象一样呈现,Hudi本身会将所有数据存储在云存储之上,它没有任何长时间运行的内存组件。在执行期间它可能会在给定的事务中缓存一些内容,仅此而已。
Q11:那么应用程序所有者(例如正在查询的人)还是正在像数据科学家一样进行最终查询的人,他们是否需要了Hudi?还是对他们透明?
VC:如果他们正在执行批处理查询,例如,如果您只是查询表的快照,那么他们通常不必真正关心它是Hudi还是Delta Lake或其他任何格式,甚至是Hive,他们通常只是简单地感兴趣:"查询速度更快,数据正确",就这样。因此他们不必知道,但是如果您是写增量ETL的数据工程师,那么您需要利用非常特定于Hudi的功能,您需要了解Hudi格式是什么,因此这些人可能会意识到,如果您正在编写批处理ETL管道,您甚至都不知道它是否是Hudi表,它的行为就像其他任何Hive表一样。
Q12:当我们结束对话时,我想和您一起探索更多的东西。但是请描述现在Hudi的状态,它能做什么?您希望该项目实现的愿景是什么?距离您那个愿景有多远?
VC:广义上讲有很多原因,Hudi是我们2016年分享的愿景的体现,"所有批处理都应该是增量的",通过Hudi,我们在巩固这一目标方面取得了很大进展。当集成原始数据层的数据时需要以增量的方式进行处理,我们在Hudi中构建了许多出色的软件堆栈,它们的性能可能非常出色,并且具有许多功能可以使您做到这一点。具体地说我们有一个数据库核心和一组类似的服务,这些服务都可以水平扩展和轻松部署。如果您知道如何部署Spark作业和Flink作业,Hudi可以开箱即用。我们将来真正想投资的部分实际上正在释放真正的端到端增量ETL管道,我们应该能够编写非常复杂的ETL管道。批处理非常简单,它是无状态的。然而今天的流处理是有状态的,甚至需要像一套不同的工程师一样来编写非常好的流处理程序,因此我们实际上希望降低该标准,然后帮助人们编写复杂的增量ETL作业,并为该模型增加更多的批处理ETL工作量,就像我们希望该项目达到目标一样,另一部分是我们需要在项目中解决的另一件事,我们正在逐步进行所有工作,因为我们希望节省成本,并且希望数据新鲜度更高,但是查询引擎侧还有很多空白,云存储系统的一些基本限制可能会影响这些新数据的实时查询性能,因此这有两个方面。数据延迟我们可以通过增量ETL和增量摄取来解决,但是交互式和类似实时分析查询的性能是我们可能需要构建的东西,例如Hudi中的可变缓存,列式缓存层,它实际上可以吸收大量更新,将其保存在内存中,降低了合并成本,以便人们可以很好地对其进行查询,现在所有表统计信息都写在一个JSON文件和Avro文件中,这就像可伸缩性一样,但是用这种方式计划查询可能会花费大量时间。因此我认为一个高性能和高度可伸缩的元存储,内部有Snowflake或BigQuery或redshift之类的东西,我们需要构建类似的东西,我认为将这两者放在一起将真正释放我们的愿景,那就是所有数据都应该非常快地到达,并且也应该能够很快被查询。