保持优步平台在全球市场的可靠性和实时性是一项24/7的业务。在旧金山,人们可能会去睡觉,但在巴黎,人们正在为上班做准备,请求优步(Uber)的司机合作伙伴搭车。与此同时,在地球的另一端,孟买的居民可能正在通过Uber Eats点餐。
我们在优步上促进这些互动大数据平台,使用我们的市场匹配骑手和司机伙伴;食客、餐馆和送餐伙伴;还有卡车司机和托运人。对这些交互的数据驱动的洞察帮助我们构建产品,为全球用户提供有回报和有意义的体验。
由于食客希望他们的食物及时送达,而乘客希望在最短的时间内等待取货,我们的数据必须尽可能及时地反映现场的事件。然而,由于数据从各种来源进入我们的数据湖,在这种规模下保持数据新鲜是一个主要挑战。
虽然现有的解决方案为许多公司提供了24小时的新鲜工作,但对于Uber的实时需求来说,它太过时了。此外,Uber的数据量和运营规模阻碍了这种解决方案的可靠工作。
为了满足这些需求,我们开发了DBEvents,这是一种变更数据捕获系统,旨在提高数据质量和新鲜度。变更数据捕获系统(疾病预防控制中心)可以用来确定哪些数据已逐步更改,以便可以采取措施,如摄入或复制.DBEvents有助于引导、吸收现有表的快照和增量流更新。
补充优步开发的其他软件,比如马尔马雷而且Hudi, DBEvents从MySQL、Apache Cassandra和Schemaless等数据源捕获数据,更新我们的Hadoop数据湖。该解决方案管理pb级的数据,并在全球范围内运行,帮助我们为内部数据客户提供尽可能好的服务。
快照数据摄取
从历史上看,Uber的数据摄取始于我们确定要摄取的数据集,然后运行一个大型处理作业,使用MapReduce和Apache Spark等工具从源数据库或表中以高度并行的方式读取数据。接下来,我们将把这个作业的输出传输到离线数据湖,比如HDFS或Apache Hive。这个过程被称为快照,根据数据集的大小,通常需要几分钟到几个小时,这对于我们内部客户的需求来说不够快。
每当一个作业开始摄取数据时,它就展开并行任务,建立到上游表(如MySQL)的并行连接,并提取数据。从MySQL读取大量数据会给MySQL的实时应用程序流量带来很大的压力,将其降低到不可接受的水平。减少这种压力的策略包括使用专用服务器提取、转换和加载(ETL),但这也带来了数据完整性方面的其他问题,并增加了备份数据库服务器的额外硬件成本。
获取数据库或表快照的时间随着数据量的增加而增加,并且在某个时刻它将无法满足业务的需求。由于大多数数据库每天只更新部分数据,只添加有限数量的新记录,所以这个快照过程还会导致计算和存储资源的低效利用,导致对整个表数据(包括未更改的行)的反复读写。
DBEvents需求
由于优步需要更新鲜、更快的洞察,我们需要设计一种更好的方式,将数据吸收到我们的数据湖中。当我们开始设计dbevent时,我们为最终的解决方案确定了三个业务需求:新鲜度、质量和效率。
新鲜
数据的新鲜度指的是它更新的时间。考虑在t1时刻对MySQL表中的一行进行更新。输入作业在时间t1 + 1开始运行,需要N个单位的时间来接收该数据。数据在t1 + 1 + N时刻对用户可用。这里,数据的新鲜度延迟等于N + 1,这是数据实际更新到数据湖可用之间的延迟。
Uber有很多用例都要求N+1尽可能小,最好是几分钟。这些用例包括欺诈检测在美国,即使是最轻微的延迟也会影响客户体验。由于这些原因,我们在dbevent中将数据新鲜度列为高优先级。
质量
如果我们不能描述或理解数据湖中的数据,那么数据湖中的数据就毫无用处。想象一下,不同的上游服务对不同的表有不同的模式。尽管每个表都是用一个模式创建的,但是这些模式会随着用例的变化而变化。如果没有为所摄入的数据定义和发展模式的一致方法,数据湖可能很快变成数据沼泽,即无法理解的大量数据的集合。
此外,随着表模式的发展,重要的是要说明添加新字段或弃用现有字段背后的原因。如果不理解列代表什么,就无法理解数据。因此,确保高质量的数据是dbevent的另一个优先事项。
效率
在Uber,我们有数千个微服务,负责业务逻辑的不同部分以及不同的业务线。在大多数情况下,每个微服务都有一个或多个用于存储非临时数据的后台数据库。可以想象,这将导致数百或数千个表可以被吸收,需要大量的计算和存储资源。
因此,我们为dbevent定义的第三个目标是使系统高效。通过优化存储和计算等资源的使用,我们最终降低了数据中心使用和工程时间方面的成本,并使将来添加更多源变得更容易。
设计DBEvents
考虑到这三个需求,我们构建了DBEvents (Uber的变更数据捕获系统),以增量方式捕获和吸收数据中的变更,从而改善我们平台上的体验。
数据集的摄取可分为两个过程:
- 引导:表的一个时间点快照表示。
- 增量的摄入:增量地摄取和应用表中的更改(发生在上游)。
引导
我们开发了一个源可插入库,通过我们的摄入平台Marmaray将Cassandra、Schemaless和MySQL等外部源引导到数据湖中。这个库提供了有效引导数据集所需的语义,同时提供了可插入的体系结构来添加任何源。每个外部源都将其原始数据的快照备份到HDFS中。
快照备份过程完成后,Marmaray调用库,库反过来读取备份数据,对其进行模式化,并将其作为Marmaray可用的Spark Resilient Distributed Dataset (RDD)提供。Marmaray在执行可选的重复数据删除、部分行合并和各种其他操作之后,将RDD持久化到Apache Hive中。
为了提高摄取真正大的表的效率和可靠性,引导过程是增量的。我们既可以为数据集定义批处理大小,也可以增量地(可能是并行地)引导数据集,从而避免过大的作业。
MySQL引导示例
创建MySQL数据库的备份通常涉及到在文件系统上复制数据,并使用本机文件格式将其存储在另一个存储引擎中。这种逐位复制文件的备份类型称为物理备份。由于索引的存在,被复制的所谓物理文件通常包含重复的数据,这大大增加了磁盘上数据集的大小。
作为DBEvents体系结构的一部分,我们开发并开源了一个名为StorageTapper,它从MySQL数据库读取数据,将其转换为schemalized版本,并将事件发布到不同的目的地,如HDFS或Apache Kafka。这种在目标存储系统上生成事件的方法允许我们创建逻辑备份。逻辑备份不是在数据集的直接副本上使用,而是依靠StorageTapper根据原始数据库创建的事件在目标系统上重新创建数据集。
除了比物理备份效率更高外,逻辑备份还具有以下优点:
- 由于数据采用标准的、可用的格式,它们很容易由原始存储服务以外的系统处理。
- 它们不依赖于MySQL的特定版本,因此提供了更好的数据完整性。
- 它们非常紧凑,因为它们不复制重复的数据。
实现新鲜
为了使我们的数据尽可能新鲜,我们需要以小批量的增量方式使用数据集并对其应用更改。我们的数据湖使用HDFS(一个只追加的系统)来存储pb级的数据。我们的大多数分析数据都是用Apache Parquet文件格式编写的,这种格式适用于大型柱状扫描,但不能更新。不幸的是,由于HDFS是只能追加的,而Apache Parquet是不可变的,用户不能在不批量重写整个数据集的情况下对数据集应用更新,或者,在Hive的情况下,重写数据集的大分区。
为了快速获取数据,我们使用ApacheHudi,一个由Uber创建的开源库,用于管理HDFS中的所有原始数据集,它减少了对我们的不可变数据湖执行upsert的时间。Apache Hudi在数据集上提供原子上插入和增量数据流。
MySQL增量摄入示例
除了引导之外,我们还可以使用StorageTapper从MySQL源执行增量摄取。在我们的用例中,StorageTapper从MySQL二进制日志中读取事件,记录对数据库的更改。二进制日志包括所有INSERT、UPDATE、DELETE和DDL操作,我们将这些操作称为二进制日志事件。将这些事件写入日志的顺序与向数据库提交更改的顺序相同。
StorageTapper读取这些事件,将它们编码到Apache Avro格式,并将它们发送到Apache Kafka。每个二进制日志事件在Kafka中是一个消息,每个消息对应一个完整的表数据行。由于发送到Apache Kafka的事件反映了对原始数据库进行更改的顺序,所以当我们将Apache Kafka的消息应用到另一个数据库时,我们得到了原始数据的精确副本。这个过程比直接将数据从MySQL传输到另一个数据库使用更少的计算资源。
执行质量
为了确保高质量的数据,我们首先需要使用模式在数据湖中定义数据集的结构。Uber使用了一种名为schema - service的内部模式管理服务,该服务确保数据湖上的每个数据集都有一个相关联的模式,并且该模式的任何演化都遵循特定的演化规则。这些进化规则保证了模式的向后兼容性,以避免破坏此类数据集的消费者。
schema - service使用Apache Avro格式存储模式并执行模式演化。该模式通常是上游表模式的1:1表示。自助服务工具允许内部用户发展模式,只要这些更改被接受为向后兼容。一旦以Apache Avro格式完成模式更改,将对表应用数据定义语言(DDL)语句以更改实际的表模式。
模式编码是将每个数据进行模式化的过程。模式实施库(热管)对每个数据进行模式化或编码,很像可以对每个数据执行模式检查的瘦客户机。模式强制库还将元数据添加到每个更改日志中,使其具有全局标准化,而不考虑数据来自什么源或数据打算写入哪个接收器。确保我们的所有数据都遵循一个模式,并且我们的模式是最新的,这意味着我们可以找到并使用摄入到数据湖中的所有数据。
MySQL模式实施示例
如上所述,用户可以通过schema - service请求对MySQL的模式进行更改,该服务将验证更改并确保它们向后兼容。如果请求成功,则可以使用模式的新版本。每当StorageTapper在MySQL二进制日志中读取ALTER TABLE语句时,它都会检测到这些模式更改。这将触发StorageTapper开始使用新的模式来处理进一步的事件。
有效利用资源
我们发现了旧管道的一些低效之处:
- 计算使用:大型作业对整个表进行快照并以一定的节奏重新加载它们是非常低效的,特别是在可能只更新了少量记录的情况下。
- 上游稳定性:由于经常需要加载整个表,作业会给源程序带来压力,比如在MySQL表的大量读取期间。
- 数据正确性:没有预先执行数据质量检查,导致数据湖用户体验欠佳,数据质量较低。
- 延迟:从源表中发生突变的时间到可以在数据湖上查询它的时间之间的延迟很大,从而降低了数据的新鲜度。
Hudi通过只使用和应用上游表的更新行和变更日志,提高了dbevent驱动的管道的效率。Hudi通过允许增量更新而不是快照,使用更少的计算资源,改善了我们发现的许多低效率问题。通过读取变更日志,Hudi不需要加载整个表,从而减少了上游源的压力。
下面的图4清楚地描述了这些解决方案如何在DBEvents增量体系结构中协同工作。在优步,我们从各种不同的来源获取数据。每个源都有一个自定义实现来读取变更日志事件并提供增量更改。例如,MySQL的变更日志是通过StorageTapper跟踪并推到Apache Kafka的,如上所述,而Cassandra的变更日志是通过名为变更数据捕获(CDC)的Apache Cassandra特性以及特定于uber的集成提供的。
马尔马雷是Uber的开源、通用的数据采集和传播库。在较高的级别上,Marmaray为我们的DBEvents管道提供了以下功能,从而提高了体系结构效率:
- 通过我们的模式管理库和服务生成高质量的模式化数据。
- 将来自多个数据存储的数据导入Apache Hadoop数据湖。
- 使用Uber的内部工作流编排服务构建管道,以处理和处理摄入的数据,并在HDFS和Apache Hive中存储和计算基于这些数据的业务指标。
单个摄入管道执行相同的操作有向无环图作业(DAG),而不考虑数据源。这个过程在运行时根据特定的源确定摄入行为,类似于策略设计模式.
标准化变更日志事件
我们的目标之一是以一种可以被其他内部数据使用者(如流作业和定制管道)使用的方式标准化变更日志事件。
在标准化dbevent中的变更日志之前,我们需要解决几个问题:
- 在设计有效负载时,我们如何发现错误?我们应该如何处理它们?
- 有效载荷可能是复杂的,只有完整有效载荷的一部分可能被更新。我们怎么知道更新了什么?
- 如果这个有效负载是上游数据库或表中行突变的更改日志,那么行的主键是什么?
- 既然我们使用Apache Kafka作为消息总线来发送和接收变更日志,我们如何强制使用事件的单调递增时间戳呢?
为了回答DBEvents用例的这些问题,我们定义了一组Apache Hadoop元数据头,可以将其添加到每个Apache Kafka消息中。在这种设计中,元数据和数据都通过热管编码(使用Apache Avro),并通过Apache Kafka传输。这使我们能够标准化这类事件的所有消费者使用的全局元数据集。此元数据单独描述每个更新,以及这些更新在某种程度上如何与以前的更新流相关。元数据也被写入Apache Hive的一个名为MetadataStruct的特殊列中,该列遵循一个元数据模式。然后,用户可以轻松地查询MetadataStruct并查找关于行状态的进一步详细信息。
下面,我们重点介绍了一些跨事件标准化的关键元数据字段:
| Hadoop元数据字段 | |
| 元数据字段 | 描述 |
| 行键 | Row Key字段是每个源表的唯一键,用于标识源表行,并根据结果合并部分变更日志。 |
| 参考关键 | 参考关键字字段是接收到的变更日志的版本,必须单调递增。此键用于确定数据是否表示特定行的最近更新。 |
| 列的更新日志 | “更改日志列”字段是一个array |
| 源 | Source字段反映用于生成此更改日志的源表的类型。一些例子包括Apache Kafka、Schemaless、Apache Cassandra和MySQL。 |
| 时间戳 | Timestamp字段以毫秒为单位标记事件的创建时间。时间戳属性有多种用途,但最重要的是监视延迟和完整性。(我们将事件的创建称为数据示意图服务(例如StorageTapper)在将事件推入Apache Kafka之前实际对事件进行示意图化的时间。) |
| isDeleted | (真/假)。这是一个布尔值,支持删除row_key在Hive表中。 |
| 错误异常 | Error Exception是一个字符串,捕获发送当前更改日志消息时面临的异常或问题(如果没有错误,则为空)。如果源数据出现任何模式问题,Error Exception将反映接收到的异常,稍后可用于跟踪源问题或修复/重新发布消息。 |
| 错误源数据 | 错误源数据是一个包含实际源数据错误的字符串(如果没有错误则为null)。在任何有问题的消息的情况下,我们不能将此字符串摄取到主表中,而是将其移动到相关的错误表中。我们可以使用这些数据与制作人合作进行修复。 |
| ForceUpdate | (真/假)。ForceUpdate是一个布尔值,它确保这个更新日志应用于现有数据之上。在许多情况下,aref_key比上次见到的还要老ref_key被认为是副本并被跳过。设置了此标志之后,更改日志将被应用,而不管hadoop_ref_key字段。 |
| 数据中心 | 数据中心字段指事件发生的原始数据中心。这个字段对于跟踪消息和调试任何潜在问题非常有帮助,特别是对于active-active或全active体系结构。热管根据发布消息的数据中心自动填充此值。 |
如上表所示,标准化的元数据使我们的体系结构非常健壮和通用。此元数据提供充分的信息,以完全理解每个事件的状态。例如,如果对事件的模式化或解码有任何问题,错误字段将填充,如下面的图5所示,我们可以决定采取什么操作。在Uber,我们将错误与导致问题的实际有效负载一起写入错误表。
错误表有很多用途:
- 此数据的生产者可以找到没有通过模式检查的数据,然后进行修复并发布更新。
- 数据操作和工具可以使用错误表来查找和调试缺失的数据。
- 写入错误表可以确保我们在系统中不会有任何数据丢失,并且每条消息都在实际表或错误表中进行了说明。
下一个步骤
通过dbevent可获得的增量更改流,我们能够向数据湖提供更快、更新鲜和高质量的数据。使用这些数据,我们可以确保Uber的服务,如拼车和Uber Eats,尽可能有效地运行。
未来,我们打算在以下方面加强这个项目:
- 自助服务集成:我们想让数据集登陆到Apache Hive非常容易和简单。为此,我们需要对DBEvents框架进行一些增强,以便每个源实现能够无缝地触发引导和增量摄入。这需要源系统和摄取之间的集成,以及与源监控框架的集成。
- 延迟和完整性监视:虽然我们有提供这些信息的构建块,但我们只有Apache Kafka数据源类型的实现。我们希望为所有数据源类型添加增强功能。
如果对分布式计算和数据挑战感兴趣,可以考虑申请角色加入我们的队伍!
确认
特别感谢Reza Shiftehfar, Evan Richards, Yevgeniy Firsov, Shriniket Kale, Basanth Roy, jintguan和其他团队成员的所有贡献!





