由于优步持续扩展,我们的系统不断地生成更多事件,InterService消息和日志。这些数据需要通过Kafka进行处理。我们的平台如何实时审核所有这些消息?
为了监视Kafka管道的运行状况和每条消息的传递情况,我们依赖于名为Chaperone的审计系统。自2016年1月以来,Chaperone一直是优步工程多数据中心基础设施的关键部分,目前每天处理约1万亿条信息。以下是它的工作原理和我们建造它的原因。
Uber的Kafka管道概述
在Uber,服务以双活模式运行在多个数据中心。Apache卡夫卡,特别是uReplicator,是我们连接优步工程生态系统不同部分的信息总线:
对于许多下游消费者来说,几乎瞬间就能运营像Uber这样规模的Kafka是很困难的。我们积极地使用批处理,并尽可能地依赖于异步处理以获得高吞吐量。服务使用内部客户端库来发布消息卡夫卡代理,哪个批次并将其转发给区域Kafka集群。一些Kafka主题直接从区域集群中消耗,而许多其他数据中心将数据与其他数据中心的数据相结合,用于使用Ureplicator进行缩放流或批处理的聚合Kafka集群。
优步的Kafka管道有四层跨越一些数据中心。KAFKA代理及其客户是前两层。它们是每个数据中心内的区域Kafka集群的下一层的网关。某些数据可以从区域群集复制到聚合群集中,这是流水线的最后一层。
Kafka管道中的数据遵循一个批处理和包装的路径(发送确认):
从代理客户端到Kafka broker的Uber数据流经历了几个阶段:
- 应用程序通过调用produce函数向代理客户端发送消息。
- 代理客户端将消息放入客户端缓冲区并返回给应用程序。
- 代理客户端对缓冲区中的消息进行批处理,并将它们刷新到代理服务器。
- 代理服务器将消息放入生产者缓冲区和ACK到代理客户端。然后将批次分区并按主题名称的相应缓冲区进行分区并放置在相应的缓冲区中。
- 代理服务器批量缓冲区中的消息并刷新到区域代理。
- 区域代理将消息附加到本地日志中,并向代理服务器(使用ack = 1).
- uReplicator从区域代理获取消息并将其刷新到聚合代理。
- 聚合代理将消息追加到本地日志,并将ack追加到uReplicator(使用ack = 1).
我们的Kafka设置为高吞吐量进行了优化,这引入了权衡。数以千计的微服务使用Kafka来处理成千上万的并发访问(并且还在增长),这极大地带来了潜在的问题。伴侣的目的是摄取每个消息的主题和记录计算在给定的时间范围内,在每个阶段的数据管道,早期检测和量化的数据丢失,延迟或重复沿着路径数据需要在超级。
伴侣概述
Chaperone由四个组件组成:AuditLibrary、ChaperoneService、ChaperoneCollector和WebService。
这审计实现审计算法,定时聚合和输出时间窗统计信息。因此,这个库被其他三个组件用于审计。输出模块是可插拔的(Kafka, HTTP等)。在代理客户端,审计指标被发送到Kafka代理。在其他层,指标被直接发送到专用的Kafka主题。
审计库的关键是审计算法;伴侣使用10分钟翻滚(时间)窗户连续聚合每个主题的消息。这是消息内部的事件时间,用于决定将消息置于哪个窗口。对于消息窗口,伴侣伴侣计算统计数据,如总数和p99延迟。周期性地,伴侣源将每个窗口的统计数据包装为审计消息,并将其发送到已插入的后端,该后端可以是kafka代理或kafka broker。
审计消息中的层字段对于查找审计发生的位置以及消息是否已到达此位置。通过比较特定时段的不同层的消息计数,我们可以确定在查询周期期间生成的消息是否已成功传递。
ChaperoneService是最大的工作主组件,忠实饥饿。它消耗了来自Kafka的每个消息,并记录了一个时间戳进行审计。陪伴伴侣使用HelixKafkaconsumer建成uReplicator,它已经证明了自己比Kafka高级消费者(从Kafka 0.8.2)更好的可靠性和更容易的操作。ChaperoneService会定期向一个专用的Kafka主题生成审计消息来记录状态。
ChaperoneCollector监听Kafka主题来获取所有的审计消息并将它们存储到数据库中。听,听!同时,它也填充多个仪表板:
在上图中,我们通过跨所有数据中心聚合计数来查看每个层的主题的总消息数。当没有数据丢失时,所有线路都可以完全一致。只要在层之间删除消息时,差距会出现。例如,如在底部的那样,Kafka代理丢弃了一些消息。然而,在这一层之后没有发生损失。使用此仪表板,可以轻松确定丢失窗口,以便采取相关的动作。
每个层的留言计数也有一个延迟,所以我们知道新的消息是如何以及一层是否正在延迟它们。用户在一个仪表板中获取其主题“健康的端到端可见性,而不是导航Kafka Broker或Ureplicator仪表板,而不是导航kafka broker或ureplicator仪表板,如下所示:
最后,网络服务是一个REST web前端,方便查询指标收集的Chaperone。它能让我们做一些事情,比如准确地量化损失。一旦我们知道损失的时间窗,我们查询Chaperone以获得准确的计数:
我们对伴侣的两个设计要求
在设计伴侣中,我们专注于两个必须做的任务,以实现准确的审计结果:
1)每条信息准确计算一次
为了确保每条消息只被审计一次,ChaperoneService使用了提前写日志(WAL)。每次ChaperoneService准备发出捕获的Kafka统计,它组成一个审计消息,并标记它的UUID。这个消息,连同相关的偏移量,在发送到Kafka之前被持久化到WAL中。一旦被Kafka确认,WAL中的条目就被标记为完成。这样,如果ChaperoneService崩溃,它可以使用WAL重新发送任何未标记的审计消息,并找出最后被审计的偏移量来开始使用。WAL确保每个Kafka消息只审计一次,每个审计消息至少交付一次。
接下来,ChaperoneCollector使用ChaperoneService标记的UUID来删除任何重复项。将UUID和WAL结合在一起,我们可以确保只进行一次审计。由于低开销需求,在代理客户端和服务器中很难实现类似的保证。我们依靠它们优雅的关闭来清除状态。
2)使用一致的时间戳审核跨层的消息
因为Chaperone在多个层中查看相同的Kafka消息,所以消息中嵌入时间戳是很重要的。没有它们,我们就会看到计数的时间发生了变化。在Uber,产生给Kafka的大部分数据要么用类似avro的模式编码,要么以JSON形式发送。对于使用模式编码的消息,可以在常量时间内提取事件时间。但是必须对JSON消息进行解码以提取事件时间。为了加快速度,我们实现了一个基于流的JSON解析器,它可以扫描时间戳,而无需支付解码整个消息的前期成本。这个有效的解析器在ChaperoneService中使用,但是在代理客户端和服务器中使用仍然过于昂贵。因此,我们在这两层中使用处理时间戳。但是由于不一致的时间戳,不同层之间的消息计数的差异可能会触发数据丢失的假阳性警报。我们正在努力解决时间戳不一致的问题,并计划就我们的解决方案发表一篇后续文章。
监护人在Uber的两个主要用途
1.检测数据丢失
在Chaperone建立之前,数据丢失的第一个指标是消费者抱怨数据丢失。到那时已经太晚了,我们也不知道是哪部分管道造成了损失。有了Chaperone,我们建立了一个损失检测作业,定期从Chaperone轮询度量,并在它看到层之间的计数差异时立即发出警报。警报为Kafka管道提供了端到端的覆盖,揭示了每个管道组件的系统度量很难暴露的问题。作业自动发现新的主题,您可以根据数据的重要性和丢失阈值配置不同的警报规则。丢失通知通过各种渠道发送——如寻呼系统、企业聊天或电子邮件——让你很快意识到。
2.读取超出Kafka中可用偏移量的数据
在我们生产中的大多数集群中,我们仍然使用Kafka 8,它本身并没有时间戳到偏移量的索引支持。因此,我们建立了自己的Chaperone。索引增强了我们的时间范围查询Kafka消息,所以你不受限于读取偏移量;您可以使用Chaperone提供的时间戳读取数据。
尽管KAFKA保持有限,但我们备份较旧的数据并保持了完整的消息偏移。备份主题配对由CHAPERONE创建的索引允许用户使用相同接口上的时间范围查询在Kafka中当前存在的读取数据。通过此功能,Kafka用户可以在主题的任何一段时间内检查消息,以调试其服务的问题并在必要时回填消息。当来自下游系统的审计结果与来自陪伴的人之间存在差异时,可以将特定的消息集倾倒出用于定位根本原因的细粒度比较。
概括
我们构建了陪伴伴侣以回答以下类型的问题:
- 是否有数据丢失?如果是的话,有多少和在管道中?
- 什么是端到端延迟?如果有滞后,它从哪里开始?
- 是否存在数据重复?
陪伴伴侣不仅为我们提供了一个很好的系统健康图片;它还提醒我们在数据丢失的事件中。例如,当经纪人响应意外错误时,我们的Ureplicator有一个死循环错误。既不是触发警报的Ureplicator和Kafka Broker都没有警报,但数据丢失检测作业踢入以快速公开错误。
如果你有兴趣了解更多,你可以自己尝试一下——我们有开源的Chaperone和源代码可以在Github上获得。
这篇文章是李小兵和Ankur Bansal一起写的。他们是软件工程师在我们的核心基础设施组织的流媒体平台上。






