在优步的规模下,数千个微服务每天提供数百万次乘车和送货,产生的利润超过1亿美元几百拍字节的原始数据.在公司内部,整个工程和数据团队利用这些数据来改善Uber的体验。这些数据的应用跨越了几个用例,包括计算更高效的优步拼车路线,确保优步Eats的订单在食物还热的时候送到,以及我们基础设施中的数据所代表的真实世界事件。
要使这些数据具有可操作性,需要摄取、转换、分散和编排,以便它可以广泛应用于传统商业智能、机器学习、模型培训、可视化和报告等领域。然而,在Uber快速增长的早期,我们使用了广泛的数据工作流系统,用户必须为每个用例从几个重叠的工具中选择。虽然这个庞大的工具箱允许敏捷和响应式增长,但事实证明它很难管理和维护,要求工程师在从事不同的项目时学习重复的数据工作流系统。我们需要一个能够编写、管理、调度和部署数据工作流的中心工具。
利用Uber之前部署的各种工具,包括基于airflow的平台,我们开始开发符合Uber规模的系统。这项工作促使我们开发了Piper,这是Uber的集中式工作流管理系统,它使我们能够将Uber的数据工作流民主化,使从城市运营团队到机器学习工程师的每个人都能更快更有效地开展工作。
统一工作流管理系统之路
直到几年前,Uber的团队使用多种数据工作流系统,其中一些基于开源项目,如ApacheOozie, Apache气流,詹金斯而其他的则是用Python编写的定制解决方案Clojure.每个需要移动数据的用户都必须了解并从这些系统中进行选择,这取决于他们需要完成的特定任务。每个系统都需要额外的维护和操作负担来保持其运行,排除问题,修复错误,并教育用户。
经过一番思考后,我们决定集中在一个单一的工作流系统上。在评估业界可用的数据工作流工具时,我们权衡了每种工具的优缺点,并考虑了几个因素,例如易用性、稳定性、开源生态系统、对Hadoop生态系统的依赖性、领域特定语言(DSL)的表达性和所使用的编程语言(与我们的用户基础的语言技能相比)。
在我们的新系统中,我们寻找以下品质:
- 工作流应该很容易通过代码编写,同时具有表达性,支持动态生成工作流的能力。
- 支持工程师习惯的开发过程,包括将数据工作流开发为代码,并通过修订控制进行跟踪。
- 易于可视化和管理的工作流。
- 便于查看的日志,包括工作流的过去运行和当前运行。
在这次评估之后,我们的目标是形成一个能够支持Uber规模的单一工作流系统,于是我们确定了一个基于airflow的系统。基于airflow的DSL提供了灵活性、表现力和易用性的最佳权衡,同时也为我们广泛的用户(包括数据科学家、开发人员、机器学习专家和运营员工)提供了可访问性。
当我们向单一数据多租户工作流系统靠拢时,我们开始了对现有其他系统的退役工作。弃用这些系统显著地简化了用户编写工作流的能力,以及我们的团队管理和长期迭代改进系统的能力。
选择部署模型:集中式多租户平台
在部署我们的系统时,我们可以选择是使用单一的、集中管理的安装,还是为公司中的每个团队或组织使用一个安装。我们在行业中发现了这两种情况的例子,从亚马逊AWS数据管道的单一托管安装到谷歌Cloud Composer允许的单独的风流安装。在后一种情况下,用户或管理员负责系统的设置和配置,供应商只提供有限的支持。
为了做出这个决定,我们考虑了以下几个因素:
- 当发生变更时,系统是如何升级的?
- 当出现错误或基础结构问题时,谁负责随叫随到和系统支持?
- 当工作流数量增加时,谁负责扩展系统?
- 我们如何避免雪花集群和节点,不同配置的系统的不同版本?
- 系统是否需要使用它的团队进行任何维护或配置?
在考虑了上述因素之后,我们构建了一个集中部署的模型,即由我们的数据工作流管理团队支持的每个数据中心单独安装。对于最终用户,我们的系统实现Piper是自助的和可靠的,但我们的团队管理Piper,确保它是更新的,并能够随着整个公司的工作流增长可靠地扩展。我们还通过办公时间、功能请求、文档和培训为不需要了解系统内部工作的最终用户提供支持。
在选择中央部署模型时,我们需要确保Piper可以扩展到比单个团队部署多得多的工作流数量,在单个团队部署中,每个安装只需要有限数量的工作流。与多安装模型相比,Piper还需要提供更好的隔离和多租户功能。
系统架构
虽然我们基于最初的开源风流架构建立了Piper,但我们重新构建了系统的大部分,使其性能更佳、可用性更高,并适合Uber的基础设施。下图详细描述了我们最初的Piper架构:
最初的Piper架构由以下五个组件组成:
- Web服务器:为HTTP请求提供服务的应用程序服务器,包括那些用于UI端点和JSON API端点的请求。
- 调度程序:负责安排工作流程和任务。调度器考虑各种因素,如计划间隔、任务依赖关系、触发规则和重试,并使用这些信息计算下一组要运行的任务。一旦资源可用,它将在适当的执行器(在我们的例子中是芹菜)中排队执行任务。
- 芹菜工人:工作人员执行所有工作流任务。每个worker从队列(在我们的例子中是Redis)中提取下一个要执行的任务,并在本地执行该任务。(可执行任务由工作流ID、任务ID和执行日期标识)。
- 元数据数据库:系统中所有实体(如工作流、任务、连接、变量和xcom)的真相来源,以及工作流的执行状态。
- Python工作流:用户编写的用于定义工作流、任务和库的Python文件。
与用户代码的隔离
通过在生产环境中操作Piper,我们学到的一个重要经验是需要将用户代码与系统代码隔离开来。工作流DSL的一个优点是工作流可以被任意的Python结构定义,比如遍历磁盘上的配置文件,调用外部服务来获取配置数据,或者直接调用命令行。然而,这种灵活性与系统的稳定性和可靠性相冲突,因为用户代码可以运行任意的逻辑,执行速度较慢,并可能导致系统错误。一般来说,好的系统设计鼓励尽可能地将用户代码与系统代码隔离开来。
如下面的图2所示,原始体系结构依赖于在所有系统组件中执行用户代码,这些组件包括Scheduler、Web服务器和芹菜工作者。
Piper的目标之一是尽可能可靠和快速地调度任务。然而,用户代码在这些不同的组件中运行的事实给我们的系统稳定性带来了一些问题。在我们的环境中,工作流驱动程序可以为单个Python文件生成数千个工作流,有时需要等待外部服务检索配置,加载速度可能很慢,从而对系统可用性和性能产生负面影响。
经过考虑,我们意识到我们可以将工作流的元数据表示与Python定义解耦。一个工作流定义文件可以分解为两个单独的表示:
- 工作流和任务的元数据表示:包括工作流/任务属性以及任务之间的依赖关系图的序列化表示。这种表示可以被调度程序和web服务器等系统组件使用,它们只需要知道关于每个工作流的高级元数据,而不需要加载或执行用户管道定义文件。
- 完全实例化的工作流:完全由用户提供的符合DSL规范的Python任务和工作流表示。我们可以使用这种表示来提取工作流元数据,并在芹菜工作者上执行任务。
为了实现元数据分解,我们在系统中引入了一个新组件,它的作用是加载用户Python工作流定义、提取它们,然后将它们的序列化元数据表示存储在数据库中。然后,元数据表示可以被系统的其他组件使用,如调度程序和web服务器,而不必加载任何用户代码。通过将工作流的元数据表示从可执行表示中分离出来,我们能够将许多系统组件从必须加载Python工作流定义中分离出来,从而产生一个更可靠和性能更好的系统。
重新架构以获得高可用性和水平可伸缩性
在通过元数据序列化实现与用户代码的隔离之后,我们希望进一步提高Piper的可伸缩性和系统可用性。我们的目标是:
- 改进的系统效率和语言支持:在Uber,我们在微服务中使用Go和Java进行标准化,因此,我们选择在Piper中遵循这种语言标准化,并在保持Python中的DSL的同时提供更低的调度延迟和更好的性能。
- 高可用性和消除单点故障:Uber将服务托管在Apache Mesos/上μ部署该系统在Apache Mesos集群上的Docker容器中运行服务。这些服务必须优雅地处理容器崩溃、重新启动和容器重定位,而不会出现任何停机时间。在现有的系统架构中,调度是单点故障:如果调度器节点消失,系统将停止调度任何任务。在重新分配节点时也会发生这种情况,通常是由部署、硬件维护或资源短缺引起的。
- 调度的水平可伸缩性:现有系统只支持在任何时间运行单个调度器。随着新的工作流的添加,调度延迟倾向于随着时间的推移而增加。我们希望能够添加额外的调度程序,以自动接管活动工作流的一部分。这将提供自动故障转移、减少调度延迟和跨多个节点负载平衡作业调度的能力。
为了实现这些目标,我们应用了分布式系统概念来提高Piper的可用性和可伸缩性。我们介绍了使用分布式协调服务来提供原语我们可以用来加固系统,并使用以下所示的更改重新构建系统:
- 在Java中重写Piper的调度程序和执行器组件:自从推出Piper以来,我们从加载用户Python代码中分离了调度程序和执行程序,因此现在可以自由地使用最适合这项工作的任何平台或工具。随着Uber在Java和Go上的标准化,我们用Java重写了调度器和执行器组件,这允许我们使用Java的更高性能并发语义来提高系统效率。
- 杠杆领导选举:对于任何打算作为单例运行的系统组件,例如序列化器和执行器,我们添加了领导者选举功能。如果leader不可用,系统将自动从可用的备份节点中选举新的leader。这消除了单点故障,还减少了Apache Mesos中部署、节点重启或节点重定位期间的任何停机时间。
- 引入工作划分:回想一下,我们的目标是能够添加额外的调度程序,然后自动分配工作流的一部分并调度它们。使用分布式协调服务,我们能够为任务调度实现高效的工作划分。这种方法使得可以在任何时候向Piper添加新的调度器。随着新的调度程序上线,一组工作流会自动分配给它,它可以开始调度这些工作流。当调度程序节点联机或脱机时,工作流集会自动调整,为任务调度提供高可用性和水平可伸缩性。
一旦系统在阶段中被开发和测试,我们决定使用部分迁移策略,而不是一次性迁移所有工作流。我们首先同时部署了Python和分布式Java调度器,并具有在工作流级别切换调度模式的能力。通过这种策略,我们能够完全迁移所有的工作流,而不会对最终用户产生任何影响。通过上面的更改,我们现在获得了所有系统组件的高可用性和调度的水平可伸缩性,以及通过Java并发性改进的性能。
额外的平台增强
虽然到目前为止的主题涵盖了我们在调度程序和工作流序列化上执行的主要重新架构,但我们已经将其他几个增强功能集成到平台中,概述如下:
- 多数据中心语义:我们目前在每个数据中心运行一个Piper安装。我们添加了工作流语义,用户可以指定是在单计算模式还是双计算模式下运行工作流。在数据中心故障转移的情况下,我们的系统将自动将工作流转移到另一个数据中心,而不需要用户干预。
- 多租户:由于我们有成千上万的用户和数百个团队在使用我们的系统,我们需要使Piper成为多租户。我们为工作流、连接和变量等授权实体添加了额外的语义,以确保适当所有者的访问控制。
- 审计:用户操作(如编辑连接、编辑变量和切换工作流)被保存到审计存储区,如果需要,可以稍后搜索该存储区。
- 回填:我们实现了一个通用的回填功能,用户只需在UI中单击几下,就可以为任何现有的工作流创建和管理回填。
- 可视化创作工具:我们在Uber有好几类需要创建工作流的用户,其中一些人可能不熟悉Python开发。因此,我们提供了几种创建工作流的方法,通常是通过动态创建工作流的领域特定的ui。这些UI创作工具专门针对机器学习、仪表盘和摄取等垂直领域。我们目前也在开发一个通用的可视化拖放工具,用于在我们的系统上创建工作流。
- 工作流定义REST API我们添加了通过动态创建工作流的能力REST API调用时不需要Python代码。这与Apache类似风暴通量API.
- 持续部署:我们使用monorepo来存储我们的工作流定义代码。我们确保在不需要用户干预的情况下,将monorepo持续部署到必要的系统组件中。
- 持续集成:每个提交到工作流的用户monorepo都会通过一套单元测试运行,以确保没有引入错误,并确保工作流是有效的。
- 度量和监控:我们已经插入了度量和监视使用超级的M3而且uMonitor系统。我们还添加了金丝雀工作流来评估系统运行状况和性能,收集关于系统和基础设施统计信息,并在出现系统中断时使用这些指标向我们的团队发出警报。
- 日志:我们已经重新设计了任务日志,以适应Uber日志基础设施,并确保它是可靠的,对最终用户即时可用,而不影响我们的系统可用性或可靠性。
关键的外卖
从我们最初的系统部署到今天,我们已经从几十个工作流发展到成千上万的工作流,每天运行数十万个任务,由成千上万的用户管理。我们做到了这一点,同时保持了系统的稳定性和性能,并改进了可用性、可伸缩性和可用性。为了支持这一规模,我们遵循了以下几个原则,重新构建了系统:
- 优先考虑用户友好性和表达性编写工作流。这包括轻松管理工作流和即时访问工作流日志的能力。
- 只要有可能,在整个公司内集中使用统一的产品。整合到我们统一的系统中大大减少了我们的维护负担、随叫随到事件和用户困惑,同时将团队统一在一个可以迭代改进的单一产品周围。
- 选择中央多租户部署模型。在我们的例子中,使用这样的模型允许我们更好地支持我们的用户,并提供一个简单的升级路径,而不需要团队在建立系统的新实例时了解系统内部或配置。
- 尽可能避免在系统组件中运行用户代码。分岔工作流元数据极大地提高了我们系统的可靠性,也提供了额外的灵活性(允许我们用Java重写调度组件)。
- 消除任何单点故障以确保正常运行时间和系统可用性。
- 使用分布式系统概念例如领导者选举、故障转移和工作分区,以获得更高的可用性和改进的可伸缩性。
如果对分布式计算和数据挑战感兴趣,可以考虑申请角色加入我们的队伍!
致谢
我们要感谢工程经理Pawan Dixit和Anthony Asta,以及数据工作流团队成员Alex Kira、Ankit Mody、Atasi Panda和Prakhar Garg。






