随着存储格式的演变Apache拼花而且Apache兽人查询引擎转眼间而且Apache黑斑羚Hadoop生态系统有潜力成为一个通用的、统一的服务层,可以容忍几分钟的延迟。然而,为了实现这一点,它需要在Hadoop分布式文件系统(HDFS)中高效、低延迟的数据摄取和数据准备。
为了解决这个问题,我们在优步建立了Hudi(读作hoodie),一个增量处理框架,以低延迟和高效率为所有业务关键数据管道提供动力。事实上,我们最近有开源供他人使用和构建。但在深入研究Hudi之前,让我们先退一步,讨论一下为什么考虑Hudi是一个好主意Hadoop作为统一服务层。
动机
λ架构是一种常见的数据处理架构,提出了流处理层和批处理层的双重计算。每隔几个小时,启动一个批处理来计算准确的业务状态,批量更新被批量加载到服务层。同时,流处理层对相同的状态进行计算和服务,以避免上述几个小时的延迟。但是,在被更精确的批处理计算状态覆盖之前,此状态只是一个近似状态。由于状态略有不同,所以需要为批处理和流提供不同的服务层,在顶部的抽象中合并,或者是一个相当复杂的服务系统(如德鲁伊),它在记录级更新和批量批量加载方面表现相当不错。
质疑单独的批处理层的必要性,Kappa架构认为流处理引擎可以是计算的通用解决方案。在一般意义上,所有的计算都可以描述为生成元组流的操作符和迭代多个输入元组流的消费者(即。火山迭代器模型)。此功能将使流层能够通过增加并行性和资源来重新进行计算,从而处理业务状态的再处理。对于能够有效地检查和存储大量流状态的系统,流层中的业务状态不再是近似值;该模型已经通过许多摄取管道获得了一些吸引力。尽管在这个模型中消除了批处理层,但是仍然存在两个不同的服务层的问题。
今天许多真正的流处理系统都以记录级运行,因此应该优化高速服务系统以进行记录级更新。通常,这些系统也不能为分析扫描进行优化,除非系统在内存中有大量数据(例如Memsql)或积极的索引(如ElasticSearch).这些快速服务系统牺牲了可伸缩性和成本,以优化摄取和扫描性能。由于这个原因,这些服务系统中的数据保留通常是有限的,这意味着它们可以持续30到90天或存储高达几tb的数据。对旧历史数据的分析通常被重定向到HDFS上的查询引擎,在那里数据延迟不是问题。
在数据摄取延迟、扫描性能、计算资源和操作复杂性之间的基本权衡是不可避免的。但是对于可以容忍大约10分钟延迟的工作负载,如果有更快的方法在HDFS中摄取和准备数据,就不需要单独的“speed”服务层。这统一了服务层,并显著降低了整体复杂性和资源使用。
然而,HDFS要成为统一的服务层,它不仅需要存储变更集日志(记录系统),还需要支持压缩、去重复的业务状态,这些状态由有意义的业务度量划分。这种统一服务层需要具备以下特性:
- 快速的能力应用突变到大型HDFS数据集
- 数据存储选项是优化分析扫描(柱状文件格式)
- 能力链接和传播更新高效地建模数据集
压缩的业务状态通常不能避免突变,即使业务分区字段是事件发生的时间。由于延迟到达的数据和之间的差异,摄取仍然可能导致对许多旧分区的更新事件和处理次了。即使分区键是处理时间,仍然可能需要更新,因为出于审计遵从性或安全原因需要清除数据。
介绍海蒂:嗨,海蒂!
输入Hudi,一个增量式框架它支持上一节中概述的上述需求。总之,Hudi (HadoopUpsertDelete和我ncremental)是一种分析性的、扫描优化的数据存储抽象,它可以在几分钟的时间内对HDFS中的数据应用突变,并链接增量处理。
Hudi数据集与当前的Hadoop生态系统集成(包括Apache蜂巢, Apache Parquet, Presto,和Apache火花)通过一个自定义的InputFormat,使框架无缝为最终用户。
的数据流模型根据延迟和完整性保证来描述数据管道。下面的图4展示了优步工程公司的管道是如何分布在这个范围内的,以及每个管道通常应用的处理风格:
对于少数真正需要1分钟延迟的用例和带有简单业务指标的仪表板,我们依赖于记录级流处理。对于传统的批处理用例,如机器学习和实验有效性分析,我们依赖于擅长于繁重计算的批处理。为用例当需要在接近实时的延迟下进行复杂的连接或重要的数据处理时,我们依靠Hudi及其增量处理原语来获得两全其美的效果。要了解更多关于Hudi支持的用例,您可以查看我们的文档Github.
存储
Hudi将数据集组织到一个分区的目录结构下basepath,类似于传统的Hive表。数据集被分解为多个分区,这些分区是包含该分区的数据文件的目录。每个分区由它的partitionpath相对于basepath.在每个分区中,记录被分布到多个数据文件中。每个数据文件都由唯一的文件标识以及生成文件的提交。在更新的情况下,多个数据文件可以共享相同的文件文件标识在不同的提交中编写。
每个记录由一个记录键唯一标识,并映射到文件标识。记录键和之间的映射文件标识将记录的第一个版本写入文件后,将是永久的。简而言之,文件标识标识包含一组记录的所有版本的一组文件。
沪迪存储由三个不同的部分组成:
- 元数据: Hudi将在数据集上执行的所有活动的元数据维护为时间轴,这使得数据集的瞬时视图成为可能。的元数据目录下存储basepath.下面我们概述了时间轴中的操作类型:
- 提交:单个提交捕获关于将一批记录原子写入数据集的信息。提交由一个单调递增的时间戳标识,表示写入操作的开始。
- 清洗:后台活动,用于清除数据集中不再用于运行查询的旧版本文件。
- 件:协调Hudi内部差异数据结构的后台活动(例如,将更新从基于行的日志文件移动到柱状格式)。
- 指数: Hudi维护一个索引来快速地将输入的记录键映射到文件标识如果记录键已经存在。索引实现是可插拔的,以下是当前可用的选项:
- 存储在每个数据文件页脚中的Bloom过滤器:首选的默认选项,因为它不依赖于任何外部系统。数据和索引始终保持一致。
- Apache HBase:高效查找小批量的键。这个选项可能会在标记索引时节省几秒钟的时间。
- 数据: Hudi将所有摄入的数据存储为两种不同的存储格式。实际使用的格式是可插入的,但从根本上需要以下特征:
- 扫描优化的柱状存储格式(ROFormat).默认是Apache拼花.
- 写优化的行存储格式(WOFormat).默认是Apache Avro.
图5:Hudi存储内部。上面的Hudi Storage图描述了YYYYMMDDHHMISS格式的提交时间,可以简化为HH:SS。
优化
Hudi存储针对HDFS的使用模式进行了优化。压缩是将数据从写优化格式转换为扫描优化格式的关键操作。因为压缩的基本并行单位是重写单个文件标识, Hudi确保所有数据文件都被写入HDFS块大小的文件,以平衡压缩并行性、查询扫描并行性和HDFS文件总数。压缩也是可插拔的,它可以扩展到缝合较旧的、更新频率较低的数据文件,以进一步减少文件总数。
摄取路径
Hudi是一个Spark库,旨在作为流摄取作业运行,并以小批量(通常在一到两分钟的顺序)摄取数据。但是,根据延迟需求和资源协商时间,也可以将摄取作业作为计划任务使用Apache Oozie或Apache气流.
下面是默认配置下Hudi摄取的写路径:
- Hudi装载布隆过滤器索引所涉及分区中的所有parquet文件(即从输入批中扩展的分区),并通过将传入的键映射到用于更新的现有文件,将记录标记为更新或插入。这里的连接可以根据输入批处理大小、分区分布或分区中的文件数量而倾斜。它是通过在连接键上进行范围分区和子分区来自动处理的,以避免臭名昭著的2 gb的限制用于Spark中的远程shuffle块。
- Hudi对每个分区的插入进行分组,并分配一个新的文件标识,并追加到相应的日志文件中,直到日志文件达到HDFS的块大小。一旦达到块大小,Hudi就会创建另一个文件标识并对该分区中的所有插入重复此过程。
- 调度程序每隔几分钟启动一个有时间限制的压缩过程,该过程生成一个按优先级排序的压缩列表,并压缩一个文件的所有avro文件文件标识使用当前拼花文件创建该拼花文件的下一个版本。
- 压缩异步运行,锁定正在压缩的特定日志版本并写入新的更新文件标识到一个新的日志版本。锁是在动物园管理员.
- 压缩是根据被压缩的日志数据的大小来确定优先级的,并且可以通过压缩策略插入。在每次压缩迭代中,具有最大日志量的文件首先进行压缩,而较小的日志文件最后进行压缩,因为重写parquet文件的成本并不取决于对文件的更新次数。
- 的更新文件标识如果日志文件不存在,则创建相应的日志文件。
- 如果摄取作业成功,则在Hudi元时间轴中记录提交,该时间轴自动将飞行文件重命名为提交文件,并写出关于分区和文件标识版本创建。
优化
如前所述,Hudi努力使文件大小与底层块大小一致。根据柱状压缩的效率和分区中要压缩的数据量,压缩仍然可以创建小的拼花文件。这最终在摄取的下一个迭代中被自动更正,因为对分区的插入被打包为对现有小文件的更新。最终,文件大小将增长到压缩时的底层块大小。
故障恢复
如果摄取作业由于间歇错误而失败,Spark会重新计算RDD并自动解析。如果失败次数超过maxRetries在Spark中,则摄取作业失败,下一次迭代将再次尝试摄取同一批。以下是两个重要的区别:
- 摄取失败可以在日志文件中写入部分avro块。
- 这是通过在提交元数据中存储关于块的开始偏移量和日志文件版本的元数据来处理的。在读取日志时,跳过不相关的、有时部分写入的提交块,并在avro文件上适当地设置查找位置。
- 压缩失败可能会写入部分拼字文件。
- 这由查询层处理,该层根据提交元数据过滤文件版本。查询层只会选择最后一次完成压缩的文件。下一次压缩迭代将回滚失败的压缩并再次尝试。
查询路径
提交元时间轴支持HDFS中相同数据的读取优化视图和实时视图;这些视图允许客户端在数据延迟和查询执行时间之间进行选择。Hudi为这些视图提供了自定义InputFormat,并包含一个Hive注册模块,该模块将这两个视图注册为Hive亚矿表。这两种输入格式都可以理解文件标识投入时间,并过滤文件,只选择最近提交的文件。然后,Hudi在这些数据文件上生成分割,以运行查询计划。InputFormat详情如下:
- HoodieReadOptimizedInputFormat:提供一个扫描优化的视图,过滤掉所有日志文件,只选择压缩parquet文件的最新版本。
- HoodieRealtimeInputFormat:提供更实时的视图,除了选择压缩拼花文件的最新版本外,还提供RecordReader在扫描期间将日志文件与其对应的拼花文件合并。
这两个InputFormats扩展MapredParquetInputFormat而且VectorizedParquetRecordReader因此,为读取拼字板文件所做的所有优化仍然适用。Presto和SparkSQL在Hive亚metastore表上开箱即用,如果需要的话hoodie-hadoop-mr库在类路径中。
增量处理
如前所述,建模的表需要在HDFS中处理和服务,以便HDFS成为统一的服务层。构建低延迟的建模表需要HDFS数据集的链式增量处理能力。由于Hudi维护关于提交时间和为每次提交创建的文件版本的元数据,增量更改集可以在开始时间戳和结束时间戳中从特定于Hudi的数据集中提取。
此过程的工作方式与普通查询大致相同,不同之处在于,将选择在查询时间范围内的特定文件版本,而不仅仅是最新版本,并且将有关提交时间的附加谓词推到文件扫描中,以仅检索在请求持续时间内更改的记录。可以获得更改集的持续时间取决于可以保留多少个未清理的数据文件版本。
这使得流到流的连接可以使用水印和流到数据集的连接来计算和upsert HDFS中的建模表。
接下来会发生什么?
本文中描述的大多数技术都是指Hudi的当前一代(称为Merge-on-Read)积极开发.在接下来的几个月里,Hudi计划取代Uber使用的上一代(称为写时复制)存储。的上一代简化了体系结构通过消除日志文件和减少延迟。这已经为Uber的数据摄取和建模表提供了几个月的支持。
随着Hudi继续推动延迟边界,使HDFS的摄取更快,当我们向外扩展时,不可避免地会有一些识别瓶颈的迭代。我们打算解决的几个潜在瓶颈与使用嵌入式全局不可变索引加速索引和设计自定义可索引日志存储格式有关,以便在合并时使用磁盘寻径进行优化。因此,我们欢迎反馈,并鼓励您做出贡献我们的项目。
编者按:这篇文章的原始版本用它的昵称Hoodie来指代Hudi。我们更新了这篇文章,使用了其开源项目的名称,Hudi,以避免混淆。
Prasanna Rajaperumal是数据基础设施团队的高级软件工程师,Vinoth Chandar是移动平台团队的普通软件工程师。这两家公司都位于优步的旧金山工程办公室。
我们的团队将在接下来的会议上讨论HudiStrata+Hadoop世界会议2017年3月16日,星期四,加利福尼亚州圣何塞;参观我们的展位进行工作演示。如果您需要离线联系我们,请使用帽衫邮寄列表或电邮至hoodie-users@googlegroups.com.
图片标题:Conor Myhrvold的《大象在傍晚喷水》,奥卡万戈三角洲,博茨瓦纳。






