每天在我们的平台上连接世界各地的用户,每天都需要大量的数据管理。当您考虑到数百个运营和数据科学团队分析大量匿名聚合数据,使用各种不同的工具来更好地理解和维护动态市场的健康时,这个挑战就更加令人生畏了。
三年前,优步采用开源Apache Hadoop框架作为其数据平台,使得跨计算机集群管理千兆字节的数据成为可能。然而,考虑到我们有许多团队、工具和数据源,我们需要一种方法在整个平台上可靠地吸收和分散数据。
Marmaray是Uber的开源通用Apache Hadoop数据摄取和分发框架和库。Marmaray是由我们的Hadoop平台团队构建和设计的基于插件的框架构建在Hadoop生态系统之上。用户可以通过使用Apache Spark添加对从任何来源摄取数据并将数据分散到任何接收器的支持。这个名字,马尔马雷,来自一个土耳其隧道连接欧亚。类似地,我们将Uber内部的Marmaray设想为一条管道,根据客户的偏好将数据从任何来源连接到任何接收器。
数据湖通常保存质量参差不齐的数据。Marmaray确保所有摄入的原始数据都符合适当的源模式,保持高质量,从而使分析结果可靠。数据科学家可以花时间从这些数据中提取有用的见解,而不是处理数据质量问题。
在Uber, Marmaray将一系列系统和服务以一种连贯的方式连接起来,完成以下工作:
- 通过我们的模式管理库和服务生成高质量的模式数据。
- 通过Marmaray摄取从多个数据存储中摄取数据到我们的Hadoop数据湖。
- 使用Uber的内部工作流编排服务构建管道,以处理吸收的数据,并基于这些数据存储和计算业务指标蜂巢.
- 将Hive处理后的结果提供给在线数据存储,内部客户可以通过马尔马雷扩散查询数据并获得近乎即时的结果。
虽然Marmaray实现了我们对任何来源到任何接收器的数据流的愿景,但我们也发现需要建立一个完全自助服务的平台,为来自不同背景、团队和技术专长的用户提供无障碍的入行体验。
Hadoop的开源特性允许我们将其集成到我们的平台中进行大规模数据分析。当我们构建Marmary以促进Hadoop上的数据摄入和传播时,我们觉得它也应该移交给开源社区。我们希望马尔马雷将服务于其他组织的数据需求,开源开发人员将扩展其功能。
大规模的摄入挑战
Uber的业务产生大量原始数据,并将其存储在各种来源中,如Kafka、Schemaless和MySQL。反过来,我们需要将这些数据吸收到Hadoop数据湖中,用于业务分析。数据摄取的规模随着优步许多垂直业务的增长呈指数级增长。对大规模可靠性的需求使得我们必须重新构建我们的摄入平台,以确保我们能够跟上我们的增长步伐。
我们以前的数据架构需要运行和维护多个数据管道,每个管道对应不同的生产代码库,随着时间的推移,随着数据量的增加,这被证明是很麻烦的。数据来源,例如MySQL,卡夫卡,无模式包含原始数据,需要被输入到Hive中,以支持公司各个团队的不同分析需求。每个数据源都需要理解不同的代码库及其相关的复杂性,以及一组不同且唯一的配置、图表和警报。添加新的摄取源变得非常重要,维护开销要求我们的大数据生态系统支持所有这些系统。随叫随到的负担可能会令人窒息,有时每周要发出200多次警报。
随着Marmaray的引入,我们将我们的摄取管道合并为一个与源代码无关的管道和代码库,这将被证明更具可维护性和资源效率。
不管源数据存储在哪里,这个单一的摄取管道都将执行相同的有向无循环图作业(DAG),在运行时,摄取行为将根据特定的源而变化(类似于策略设计模式)来编排摄取过程,并使用适用于处理未来不同需求和用例的通用灵活配置。
优步需要分散员工
我们的很多内部数据客户,比如Uber Eats和米开朗基罗机器学习平台团队,使用Hadoop与其他工具一起构建和训练他们的机器学习模型,以产生有价值的衍生数据集,以提高效率和改善用户体验。为了最大化这些派生数据集的有用性,需要将这些数据分散到在线数据存储中,这些在线数据存储通常具有比Hadoop生态系统低得多的延迟语义。
在我们引入马尔马雷之前,每个团队都在建立自己的临时分散系统。这种重复的工作和创造深奥的、非通用的特性通常会导致工程资源的低效使用。Marmaray被设想、设计并最终于2017年底发布,以满足对一个灵活、通用的分散平台的需求,通过提供将Hadoop数据传输到任何在线数据存储的手段来完成Hadoop生态系统。
跟踪端到端数据交付
我们的许多内部用户需要保证从源生成的数据能够高度可靠地交付到目标接收器。这些用户还需要完整性度量,涵盖数据交付到最终接收器的可靠性。理论上,这意味着已经交付了100%的数据,但在实践中,我们的目标是交付99.99到99.999%的数据来完成任务。当记录数量非常少时,很容易对源系统和接收系统运行查询,以验证数据是否已交付。
在优步,我们每天要接收数pb的数据和超过1000亿条消息,所以运行这些查询是不可能的。在这种规模下,我们需要一个能够跟踪数据传递而不会相应显著增加延迟的系统。Marmaray使用一个系统,通过Spark中定制的累加器来存储记录,让用户以最小的开销监控数据传输。
马尔马雷和戈布林
马尔马雷设计的许多基本构件和抽象概念都受到了他的启发GobblinLinkedIn也开发了一个类似的项目。LinkedIn团队非常友好地分享了他们的知识,并提供了关于他们的项目和架构的演示,这是非常感谢的。
Gobblin和Marmaray之间有一些根本的区别。Gobblin是Hadoop的通用数据摄取框架,Marmaray可以利用Apache Spark将数据摄取到Hadoop中,也可以从Hadoop中分散数据。另一方面,Gobblin利用Hadoop MapReduce框架来转换数据,而Marmaray目前不提供任何转换功能。这两个框架都提供了易于使用的自助服务功能,可以处理作业和任务调度以及元数据管理。
我们选择Apache Spark作为我们的主要数据处理引擎,而不是Hadoop MapReduce,原因有很多:
- Spark处理数据的速度要快得多,这得益于它的内存处理语义。这种接近实时的处理能力对于我们的服务水平协议(sla)至关重要。
- Spark的弹性分布式数据集(rdd)允许我们对数据应用操作和其他转换,如过滤、映射和分组,以及数据重新分区和其他优化,以确保我们的数据流最大限度地发挥Hadoop集群的处理能力。
- 我们可以进一步利用Spark来执行多个数据转换,而不需要将中间数据存储到HDFS。
- 我们可以利用Spark易于使用和熟悉的api来操作半结构化数据。
- Spark的原生容错能力保证了随机节点故障和网络分区不会影响生产能力。
- 考虑到Spark的延迟比MapReduce低,我们最终将支持Spark火花流用例为我们的客户提供更严格的sla。
- 数据处理dag的惰性计算允许我们利用更有效的优化技术,从而减少资源需求,特别是当我们将来添加DataFrame支持时。
- Hadoop数据存储为Hudi这是一个建立在Spark之上的存储抽象库。
当然,对于任何设计决策,都必须进行权衡。MapReduce和Spark是互补的框架,各自都更适合特定的需求。MapReduce可以处理比Spark大得多的数据量,这实际上使Gobblin非常适合我们的规模。然而,在实践和生产中,这并不是一个问题。对于Uber的SLA要求,我们发现Spark更适合Uber。
马尔马雷用例
超级吃
让我们来看看Marmaray是如何帮助改善优步外卖应用程序向用户提供的餐厅推荐的。Uber Eats的机器学习模型可以基于食客的点单历史推荐其他餐厅吃的人也会喜欢。
我们的预测模型使用的Uber Eats订单的原始数据通过Marmaray被输入Hive。然后将机器学习模型应用于这些原始数据之上,以生成推荐餐厅的派生数据集,这些数据集也持久化在Hive中。如果我们从Hive本身访问这些数据,并在客户启动应用程序时显示这些建议,这个过程将在几秒钟到几分钟的时间内完成,从而导致客户体验不佳。通过使用Marmaray将这些数据分散到像Cassandra这样的低延迟存储系统中,我们现在能够在几毫秒内访问这些信息,从而获得更快、更无缝的用户体验。
在我们扩大增长的过程中,构建应用统计技术从原始数据中提取情报的技术的能力对优步至关重要。然而,同样重要的是,数据存储在一个能够被需要它的客户及时访问的存储中。由于原产地商店往往不能满足客户的需求,马尔马雷的框架对于确保这种可达性的规模至关重要。
超级运费
优步货运利用优步的平台将托运人与承运人及其司机联系起来。托运人可以邮寄他们需要移动的货物,司机可以挑选他们想要的货物。作为一家将大部分运营数据存储在MySQL中的新公司,Uber Freight缺乏运行分析的方法。
使用Marmaray, Uber Freight能够利用大量的外部数据,包括近期、相关性、车道信息和里程,并将其吸收到Freight Services中。马尔马雷将数百万数据点输入Hadoop,并将数据分布到多个团队使用的工具中,帮助优步货运的业务迅速扩大。
马尔马雷架构
下面的架构图说明了Marmaray中的基本构建块和抽象,这些构建块和抽象支持其整体工作流程。这些通用组件促进了向Marmaray添加扩展的能力,使其支持新的源和接收器。
DataConverters
摄取和分散作业主要对来自源的输入记录执行转换,以确保在将数据写入目标接收器之前是所需的格式。马尔马雷将转换器串在一起,根据需要执行多个转换,还可以向多个接收器写入数据。
DataConverters的一个次要但关键的功能是为每个转换生成错误记录。出于分析目的,在将所有原始数据输入Hadoop数据湖之前,必须确保所有原始数据都符合一个模式。任何格式错误、缺少必要字段或被认为有问题的数据都将被过滤掉并写入错误表,以确保高水平的数据质量。
WorkUnitCalculator
马尔马雷以可配置大小的小批量移动数据。为了计算要处理的数据量,我们引入了WorkUnitCalculator的概念。在非常高的级别上,WorkUnitCalculator将查看输入源的类型和先前存储的检查点,然后计算下一个工作单元或批工作。例如,一个工作单元可以是Kafka的偏移范围,也可以是Hive/HDFS源文件的集合。
当计算下一批要处理的数据时,WorkUnitCalculator还可以考虑节流信息,例如,要读取的最大数据量或要从Kafka读取的消息数量。这种节流信息是每个用例都可配置的,并提供了最大的灵活性,从而确保工作单元的大小是适当的,并且不会压倒源系统或接收器系统。
元数据管理器
所有Marmaray作业都需要一个持久化存储(我们称之为元数据管理器)来缓存作业级别的元数据信息。作业可以在执行期间更新其状态,并且只有在作业的当前执行成功时,作业才会替换旧保存的状态,否则,对状态的任何修改都将被拒绝。在Uber,我们使用元数据管理器来存储检查点信息(或Kafka中的分区偏移量)、平均记录大小和平均消息数量等元数据。但是,元数据存储被设计为通用的,可以存储任何相关的指标,这些指标可以根据用例和用户需求来跟踪、描述或收集作业的状态。
ForkOperator和ForkFunction
Marmaray的ForkOperator抽象使用ForkFunction将记录的输入流分割成多个输出流。ForkOperator的典型用例是为符合有效模式的记录和错误记录分别提供一个输入流,然后可以以单独和独立的方式适当地处理它们。
ISource和ISink
ISource包含了来自源数据的所有必要信息,用于适当请求的工作单元,而ISink包含了关于如何写入接收器的所有必要信息。例如,Cassandra接收器可能包含关于数据应该驻留在何处的集群、表、分区键和集群键的信息。Kafka源将包含主题名称、要读取的最大消息、集群信息和偏移初始化策略等元数据。
数据模型和工作流程
马尔马雷架构的核心组件是我们所说的AvroPayload,一个包装器Avro的GenericRecord二进制编码格式,包括我们数据处理需要的相关元数据。
Avro数据(GenericRecord)的主要优点之一是它在内存和网络使用方面都是高效的,因为与JSON相比,二进制编码的数据可以以最小的模式开销通过网络发送。使用运行在Spark架构之上的Avro数据意味着我们还可以利用Spark的数据压缩和加密功能。这些好处可以帮助我们的Spark作业更有效地处理大规模数据。
为了支持我们的“任意源到任意接收器”体系结构,我们要求所有摄取源定义从它们的模式格式到Avro的转换器,所有分散接收器定义从Avro模式到本机接收器数据模型的转换器(例如,用于卡珊德拉).
要求所有转换器要么将数据转换为AvroPayload格式,要么将数据转换为AvroPayload格式,这允许在我们的数据模型中实现松散和有意的耦合。一旦定义了源及其关联的转换,理论上就可以将源分散到任何受支持的接收器,因为所有接收器都与源无关,只关心数据是否采用中间的AvroPayload格式。Marmaray的数据模型如下图8所示:
下面的图11给出了Marmaray作业如何编排的高级流程,它独立于特定的源或接收器。
在此过程中,为每个源和接收器定义特定属性的配置将编排下一个作业的每个步骤。这包括计算出我们需要处理的数据量(即它的工作单元),应用fork函数将原始数据分割为“有效”和“错误”记录以确保数据质量,将数据转换为适当的接收器格式,更新元数据,并报告指标以跟踪进度。在Uber,所有Marmaray作业都在Apache Spark上运行,使用YARN作为资源管理器。
自助平台
由于我们的许多数据平台用户不熟悉我们在堆栈中使用的语言(例如Python和Java),因此对我们的团队来说,构建一个自助服务平台非常重要,在这个平台上,用户可以简单地单击并建立一个端到端管道,确保来自所需源的数据最终进入正确的接收器,以支持他们的分析工作和查询。
在我们的系统投入生产的七个月里,超过3300个工作已经通过我们的自助服务平台进入我们的系统。
数据删除
在Uber,所有的Kafka数据都以数据级分区的仅追加格式存储。任何特定用户的数据可以跨越多个数据分区,并且每个分区通常有许多Kafka记录。如果底层存储不包括内置索引和更新支持,则扫描和更新所有这些分区以纠正、更新或删除用户数据会非常耗费资源。的拼花Hadoop使用的数据存储不支持索引,而且我们根本无法在适当的位置更新Parquet文件。为了方便索引和更新支持,Marmaray使用Hadoop更新和增量(Hudi),也是Uber开发的一个开源库,用于管理大型分析数据集的存储,以将原始数据存储在Hive中。
在高层,数据生产者使用Hive扫描表,识别要删除的记录,并将它们发布到Kafka集群,删除或屏蔽用户特定的信息。Marmaray的Kafka摄取管道反过来从Kafka集群中读取它们,其中有新的和更新的(待删除的)记录。然后,Marmaray使用Hudi的批量插入功能摄取纯新记录,保持摄取延迟较低,并使用Hudi的upsert功能处理更新的记录,用更新的修改替换旧的Kafka记录。
马尔马雷的下一个篇章
Marmaray对任何源到任何接收器的数据管道的普遍支持适用于Hadoop生态系统中的广泛用例(主要是在使用Hive时)以及数据迁移。在这方面,我们做到了向开源社区发布了Marmary,并期待收到反馈和贡献,以便我们不断改进马尔马雷平台。
与此同时,今年我们正在弃用遗留管道,并将所有工作流程都移植到Marmaray上,以简化我们的整体数据架构,并确保随着数据需求的增加,我们能够尽可能轻松地扩展。
如果你对大规模解决数据工程挑战感兴趣,可以考虑申请我们团队中的一个角色.对于优步的其他工程机会,点击这里.
订阅我们的通讯以跟上优步工程公司的最新创新。






