优步利用对汇总数据的实时分析来改善我们产品的用户体验打击欺诈行为关于Uber Eats to预测需求在我们的平台上。
随着Uber的运营变得越来越复杂,我们通过我们的平台提供了更多的功能和服务,我们需要一种方法来对我们聚合的市场数据进行更及时的分析,以更好地了解我们的产品是如何被使用的。具体来说,我们需要我们的大数据堆栈来支持跨表查询和嵌套查询,这两个需求都能让我们编写更灵活的临时查询,以跟上我们业务的增长。
为了解决这些问题,我们构建了一个链接的解决方案转眼间一个支持完整ANSI SQL的查询引擎,以及黑比诺,实时OLAP(在线分析处理)数据存储。这种结合的解决方案允许用户编写特别的SQL查询,使团队能够解锁重要的分析功能。
通过在Apache Pinot上设计完整的SQL支持,我们大数据堆栈的用户现在可以编写复杂的SQL查询,以及将Pinot中的不同表与Uber其他数据存储中的表连接起来。这个新的解决方案使具有基本SQL知识的运营团队能够构建仪表板,以便快速分析和报告聚合数据,而不必花费额外的时间与工程师一起进行数据建模或构建数据管道,从而提高整个公司的效率并节省资源。
挑战
及时、特别的数据分析为Uber的数据科学家和运营团队提供了有价值的信息,以做出明智的、数据驱动的决策,实时造福我们的用户。此外,当运维团队请求一个需要跨表或其他类型连接的数据的度量时,工程师需要手动构建一个新的度量仪表板来满足这种类型的查询。为了在我们的大数据堆栈中促进这一功能,我们需要一个解决方案,可以在Apache Pinot数据存储中使用特别的ANSI SQL查询来支持查询近实时数据。
在Uber上广泛使用的Presto是一个分布式查询引擎,允许用户编写SQL查询来访问各种底层数据存储。Presto提供了完整的SQL支持,但它通常不支持有效的实时分析数据存储,而是主要查询Hadoop中的表,其中数据的新鲜度通常是几个小时前的。
Uber各团队使用Pinot以较低的查询延迟回答分析性查询。然而,Pinot查询语言(PQL)缺乏关键功能,包括嵌套查询、连接和通用UDF(例如,正则表达式和地理空间函数)。如果用户想要做更复杂的事情,他们必须花时间(最多几个小时到一周)对数据建模。根据我们单独使用这些技术的经验,我们意识到它们实际上在进行和存储临时数据分析方面可以很好地相互补充。虽然Presto支持SQL,但用户不能使用它来访问新的聚合数据,尽管Pinot可以提供秒级数据新鲜度,但它缺乏灵活的查询支持。这些发现如下图1所示:
我们的解决方案
我们设计了一个解决方案,允许Presto的引擎实时查询Pinot的数据存储,优化了低查询延迟。我们的新系统利用通用的Presto查询语法来支持连接、地理空间查询和嵌套查询以及其他请求。此外,它还支持以秒为单位查询Pinot中的数据。通过这个解决方案,我们通过启用聚合下推、谓词下推和限制下推进一步优化了查询性能,这减少了不必要的数据传输,并将查询延迟提高了10倍以上。
该解决方案为Uber的运营团队提供了更强的分析能力。现在,用户可以充分利用SQL的灵活性来表示更复杂的业务指标,并使用内部工具将查询结果呈现到仪表板中。这种能力提高了我们的运营效率,降低了运营成本。
体系结构
在设计我们的新系统时,我们首先要考虑如何修改Presto的引擎。Presto集群有两种类型的组件:协调器和它的工作者。协调器负责查询解析、计划、任务调度,并将任务分配给它的工作者组。当协调器给它的工作者分配任务时,工作者通过连接器从数据源获取数据,并将最终结果返回给客户端。
如上面的图2所示,Presto支持插入不同的存储引擎,并允许每个worker中的连接器从底层存储中获取数据。然后,由于Pinot可以用作存储,我们可以编写一个Pinot连接器,支持通过Presto worker获取Pinot数据。这个功能使得通过Presto查询Pinot数据成为可能。
在构建Pinot连接器之前,了解Pinot的工作原理非常重要。Pinot集群有三个主要组件:控制器、代理和服务器。控制器负责节点和任务管理,服务器存储和服务数据。每个服务器包含一个段列表(换句话说,就是碎片),每个段是一组行。代理接收查询,从服务器获取数据,并将最终结果返回给客户端。Pinot服务器可以从分布式实时流媒体平台Apache Kafka中摄取数据,并且可以在数据被摄取时进行查询,因此数据的新鲜度可以达到秒级。
如上面的图3所示,Pinot服务器存储数据的不同分区,在从每个服务器获取数据后,代理将数据合并并返回最终结果。这个工作流类似于Presto架构,不同的工作人员在将数据发送到聚合工作人员之前获取数据。基于Pinot的数据处理流程,在Presto中构建Pinot连接器似乎是一个可行的选择。
普雷斯托与皮诺的联姻
为了将Presto的直观界面与Pinot的快速功能相结合,我们在Presto中构建了一个新的Pinot连接器,允许Presto以最小的延迟查询数据,促进Presto SQL支持的复杂查询。
如上图4所示,我们将Pinot的分散-聚集查询模式与Presto的协调工作架构结合起来。当用户使用这个新解决方案发送Pinot数据的Presto查询时,Presto的协调器会查询Pinot代理以访问Pinot的路由表。路由表包含关于哪些Pinot段存储在哪些Pinot服务器上的信息。接下来,Presto根据路由表创建分割。分段告诉每个工作程序一个Pinot段列表,它应该从中获取数据。随后,每个Presto worker同时查询其分配的分割中的底层Pinot数据,在适用时支持聚合和谓词下推。最后,聚合工作者聚合每次分割的结果,并将最终结果返回给用户。
我们最初的Pinot连接器
Pinot连接器的初始版本将Pinot视为数据库。其他开源的Presto连接器,比如preto -Cassandra,一个允许通过Presto查询Apache Cassandra的Cassandra连接器,以及preto -Hive,一个允许通过Presto查询HDFS数据的Hive连接器,也是这样操作的。
提高查询性能
在实现最初的工作流之后,我们发现我们的系统将大部分查询执行时间花费在数据传输上,特别是随着Pinot表的数据量的增长。工人传输的大量数据被聚合工人丢弃,获取不必要的数据既增加了查询延迟,又增加了Presto工人和Pinot服务器的额外工作量。
为了解决这些问题并提高查询性能,我们对系统进行了以下更新:
谓词下推
类中的布尔值函数在哪里子句。谓词表示特定列允许的值范围。我们实现了谓词下推,这意味着当从Pinot获取数据时,Presto协调器将把谓词下推给Presto工作者,以进行最佳筛选。当Presto工人从Pinot服务器获取记录时,我们的系统会保留工人正在操作的查询的谓词。通过在Presto workers中应用用户查询中的谓词,我们的系统只从Pinot中获取必要的数据。例如,如果Presto查询的谓词是WHERE city_id = 1,使用谓词下推将确保工作人员只从city_id = 1的Pinot段中获取记录。如果没有谓词下推,它们将从Pinot获取所有数据。
限制叠加
为了进一步防止不必要的数据传输,我们还对系统实施了限制下推。通常,用户不需要查询给定表中的所有数据行,这个新功能使用户能够在更有限的(和更少的资源密集型)规模上探索数据。例如,用户可能只想查看Pinot数据的前十行;有了这个特性,用户可以在查询中添加LIMIT 10,只对10行数据进行抽样。通过应用限制下推,我们确保了当Presto查询中有限制子句(例如,limit 10)时,当Presto工人从Pinot获取数据时,我们可以应用相同的限制,防止他们获取所有记录。
总下推
由于许多用户在他们的分析查询中应用SUM/COUNT之类的聚合,我们的新系统在相关时方便了聚合下拉,允许Pinot执行各种聚合,包括COUNT、MIN、MAX和SUM。
用户发送到Presto协调器的查询已经包含聚合请求。为了提供聚合下推,我们使用聚合提示将此信息传递给连接器。这些是在查询解析后生成的,并指示每个列中请求的聚合。然后,当Presto worker从Pinot获取数据时,他们直接请求聚合值并相应地处理它们。
由于聚合下推,我们当前的系统可以:
-
- 利用Pinot的功能,使用Star-Tree以低查询延迟支持聚合查询。
- 将像COUNT和SUM这样的聚合结果作为一个条目从Pinot服务器传递给Presto worker时,将所需的行数从数千减少到1,极大地减少了查询延迟。
- 由于减少了Presto工作者和Pinot服务器之间传输的数据量,极大地提高了10倍以上的查询性能。
由于聚合下推可以减少查询延迟的好处记忆犹新,让我们更深入地研究一下如何设计我们的系统以支持常见聚合函数的聚合下推。
按下MIN/MAX/SUM
像MIN、MAX和SUM这样的聚合比较容易下推:我们只需用实际的聚合重写Pinot查询,这样就不用获取记录,只需在每个分割中请求MIN/MAX/SUM值,并在一行中获得结果。在Presto的架构中,每次分割返回一个页面,该页面表示从底层存储中获取的数据。当Presto聚合工作程序处理该页时,它将其中的每一行视为一条记录。
例如,假设Presto工作者查询一个有三条记录的Pinot段:1、10和100。假设用户想要查询这些记录的MAX。当聚合下推未启用时,Presto worker返回一个包含三条记录的页面:1、10和100。聚合工作者计算MAX为1、10和100,并将100返回给用户。通过聚合下推,Presto worker直接向Pinot请求MAX值,并返回一个记录为100的页面。聚合计算最大值为100并将结果返回给用户。
在下面的图5中,我们描述了原始Pinot连接器的工作流程,在下面的图6中,我们将其与工具的更新版本进行比较:
如图6所示,修改后的Presto工作流中的Presto工作者现在每个段只获取一行,而不是利用原始查询中关于所请求的聚合函数的更多信息获取数千行。因此,Presto工作者和Pinot服务器之间的网络传输显著减少。
下推计数
下推COUNT不像下推MIN、MAX和SUM查询那么简单。我们的解决方案的初始架构不方便这个查询,并且会得到不准确的结果。例如,如果我们的Pinot段包含三个值,1、10和100,在Pinot中按下COUNT将返回值为3的一行,这表明有三行与原始查询匹配。当Presto的聚合工作程序处理该页时,它忽略了该行中的任何值,将其视为一行,并执行COUNT,因此最终结果将是1而不是3,这是正确的答案。
为了解决这个问题,我们重构了Presto页面,使其能够表示一个聚合的页面,然后相应地重构了页面结构和处理流程。重构的体系结构不仅为Presto工作者提供了直接构建聚合页面的灵活性,而且还使我们能够下推COUNT聚合并支持Presto中其他更复杂的聚合(如GROUP BY)。
在引入聚合下推后,我们看到了巨大的查询延迟改进,这大大减少了用户等待查询结果的时间,从而提高了开发人员的效率。
我们的preto - pinot连接器表现如何
为了评估我们的新系统的工作情况,我们对Pinot数据的查询性能进行了基准测试。我们在Parquet、ORC和Pinot分段中生成了大约1亿行。
我们在同一个SSD盒上设置Presto和Pinot集群(32核Intel Xeon CPU E5-2620 v4 @ 2.10GHz, 256GB内存)。然后,我们通过preto - hive连接器运行Presto查询,从本地磁盘上的Parquet和ORC请求数据,并通过新的preto -Pinot连接器查询Pinot数据。我们还直接通过Pinot代理查询Pinot数据。
如我们所料,直接查询Pinot可以获得最佳的查询延迟。在使用Presto进行查询时,我们发现不同数据源之间没有显著的性能差异。在查询Parquet文件时,我们看到了零星的延迟峰值,除此之外,查询Pinot与通过Hive连接器查询Parquet和ORC文件具有相似的查询延迟。
我们还通过在几个不同大小的Pinot表上发送聚合查询,对聚合下推性能的提高进行了基准测试。如下面的图9所示,随着Pinot表中文档总数的增加,我们从总下推中获得的效率收益也在增长。
如图8和图9所示,preto - pinot连接器具有与现有Hive连接器相似的查询性能,同时在HDFS中的Parquet或ORC文件上提供了更好的数据新鲜度。在Pinot连接器中进一步引入聚合下推之后,我们能够利用Pinot的分析能力来执行某些常用的聚合,这增强了查询延迟。通过允许用户使用Presto中的SQL查询访问Pinot中的新数据,Pinot连接器解锁了更精确和数据驱动的业务决策。反过来,这些决策允许我们在我们的产品套件中交付更好的用户体验。
展望未来
随着preto - pinot连接器的成功,我们已经看到使用标准SQL访问新数据是多么有价值。用户无需为不同的实时数据存储系统学习不同的SQL方言,就可以获得所需的新见解并做出明智的决策。为此,我们目前正在通过整合存储解决方案和使用Presto作为我们的统一查询层来构建下一代分析平台。
确认
我们要特别感谢付翔、罗振晓和Chinmay Soman对这个项目做出的宝贵贡献。
了解更多关于Uber如何设计实时分析的信息:






