小良的分布式之路

事业常成于坚忍,毁于浮躁

最近一些工作会稍微用到 Citus 相关的产品,想起早年在实验室做数据库开发的时候也稍微了解过一些,这次稍微搜了一下,发现 Citus 团队在 21 年的时候发了一篇 SIGMOD,正好也就顺便看了一下。简单来说,PostgreSQL本身是一个单机数据库,虽然也提供主从复制的高可用方案,但是并没有原生的分库分表方案。 正好,PG 本身提供了大量的扩展开发接口,Citus 团队以 PG 扩展的方式构建了一个分库分表的方案,这就是 Citus 最开始的应用场景。

下面我们来看看论文,跟着论文一起了解一下 Citus 主要的应用场景和对应场景的解决方案。基于 Citus 论文的结构,我们这篇文章会分为一下几个模块:

  • Citus 简介
  • Citus 的应用场景
  • Citus 的整体架构实现
  • Citus 的性能评测结果

由于是 industry track,论文整体理解起来也比较简单,我们来过一下。

Citus 简介

作为一个非常著名的开源数据库,PostgreSQL应用于许多的商业场景,这也催生了将其由单机数据库扩展为多机器的分布式数据库的需求。以可扩展性为目的,Citus 基于 PostgreSQL的插件开发接口构建了分片层、分布式查询计划生成器和执行器、以及分布式事务。同时由于其完全基于扩展接口开发,它依然保留了 PG 本身的各种功能。

文章基于 Citus 本身的应用,归纳总结了四种需要扩展 PG 的应用场景,并介绍了这四种场景下对分布式数据库的功能需求。接着,文章介绍了 Citus 是如何基于 PG 的扩展接口构建一个分布式数据库系统,并介绍了它是如何解决各类应用场景下的需求。

四种典型的应用场景及其需求

扩展PG 的四种应用场景:多租户、实时分析、高性能增删改查、数据仓库。

Alt text

多租户

多租户的场景是一个很典型的需要多数据进行分片的场景,最传统的方式是对租户的数据进行手动分片,将不同租户的数据放到不同的分片下,以实现资源的隔离。(我们的数据库就是这种方式)。还有一种方式是通过 tenant ID 列来对数据进行分片,Citus 采用的就是这种方式,这种方式下,往往需要不同租户采用相同的数据表模式,但事实上用户会希望针对不同租户设置特定的数据表模式。针对这一需求,作者简易可以使用 JSONB 数据类型累进行扩展。同时,客户也需要能够控制租户数据的位置,这样可以在监控到热点的时候对数据进行迁移。

实时分析

实时分析是为了对大规模流数据进行及时的分析。主要的流数据包括 event 数据和时间序列数据,这些数据往往是用来进行系统监控、异常检测等。这种场景下,数据库需要能够承受很高的写请求的吞吐,同时还要接收每秒数百个的读请求,用于查询数据。这类场景下,查询往往是基于预先定义好的一些索引、物化视图以及其他的一些数据转换请求,数据库需要高效地将最新的数据增量更新到上述结构中。

对于这类场景,PG 本身也有很好的支持,通过 COPY 语句可以实现非常快的数据导入,它的 MVCC 允许在写入的同时服务大量的读请求。 PG 也能支持许多复杂的数据类型。其唯一的缺点在于,这类场景下,数据往往会很快达到单机的存储极限。

为了应对这类场景,数据库系统需要能够将数据分发到不同的机器上,同时支持并行的批量加载以读取这些数据。我们之前也提到,Citus 可以实现分库分表的操作,能够将写入的数据进行分发。后面也会讲到这种情况下 Citus 如何支持高效的查询,包括比较复制的跨节点 join 等操作。

高性能增删改查

高性能的CRUD(创建、读取、更新、删除)场景会涉及许多相对独立修改的对象/文档。应用程序主要通过简单的CRUD操作访问数据,但也可能对对象发出更复杂的查询。这些对象通常遵循类似JSON的非结构化数据格式。PG 可以支持针对 JSON 的查询,一台大型的PostgreSQL服务器可以处理每秒数十万次的写入操作和数百万次的读取操作。不过,这里存在一些限制,PostgreSQL的MVCC(多版本并发控制)模型需要写入一个新的副本,然后在之后通过自动清理(auto-vacuuming)来回收空间。如果自动清理无法跟上,这可能导致性能下降。另一个限制是,由于采用每个连接一个进程的架构以及进程的相对高内存开销,PostgreSQL只能处理有限数量的(空闲)连接。

数据仓库

数据仓库将来自不同来源的数据合并到单个数据库系统中,以生成即时的分析报告。该应用程序通常不需要低延迟或高吞吐量,但查询可能需要扫描非常大量的数据。数据库需要支持非常快速的扫描,并能够为涉及任意连接的手写SQL找到高效的查询计划。

一般来说,PostgreSQL在合理的时间内执行非常大的扫描时,缺乏扫描性能和并行性。另一方面,它的性能特性、全面的SQL支持和生态系统仍然使其成为分析的吸引人选择。

为了扩展数据仓库应用程序,需要通过并行、分布式SELECT和列式存储来加速扫描。应选择分布列以最大程度地增加共同分布连接的数量,但数据库还需要通过重新分配或在网络上传播数据来支持有效的非共同分布连接。查询优化器需要决定最小化网络流量的连接顺序。

Citus 架构

在Citus集群中,所有服务器都运行带有Citus扩展以及任意数量其他扩展的PG实例。Citus使用PG扩展API来以两种方式改变数据库的行为:首先,Citus将数据库对象(如自定义类型和函数)复制到所有服务器。其次,Citus添加了两种新的表类型来利用外部服务器。

Alt text

Citus 是如何修改 PG 本身的行为的

了解过 PG 扩展开发的同学多少都知道,PG 本身提供大量的编程接口,我们可以在扩展插件中编写自己的方法来调整数据库的行为。 Citus主要使用了以下几个接口:

User-defined functions (UDFs) 可以在 SQL 查询时作为事务一部分被调用,主要用于操作Citus元数据以及执行 RPC。

查询计划器和执行器的 hook 本身是一组全局函数指针,允许扩展提供替代的查询计划和执行方法。在PG解析查询后,Citus 会检查查询是否涉及Citus表。如果是的话,Citus会生成一个包含CustomScan节点的计划树,该节点封装了分布式查询计划。

CustomScan是PG查询计划中的执行节点,它保存自定义状态并通过自定义函数指针返回元组。Citus CustomScan调用分布式查询执行器,该执行器将查询发送到其他服务器,并在将结果返回给PG执行器之前收集这些结果。

Transaction callbacks在事务的生命周期中的关键点(例如预提交、后提交、中止)被调用。Citus使用这些回调来实现分布式事务。

Utility hook 在解析不经过常规查询计划器的任何命令后被调用。Citus用这些 hook 来执行Citus表相关的DDL和COPY命令。

Background workers在单独的进程中运行用户提供的代码。Citus使用此API运行维护守护进程。该守护进程执行分布式死锁检测、两阶段提交准备事务恢复和清理。

通过这些钩子,Citus可以拦截客户端和涉及 Citus 表的 PG 引擎之间的任何交互,并替换或增强PG的行为。

Citus 的整体架构

Citus部署通常包括一个coordinator和多个worker,coordinator存有分布式表的元数据,客户端通常连接到协调器。当用户通过Citus UDF添加工作节点时,一个PG服务器隐式地成为协调器。worker存储包含实际数据的分片。当集群规模较小时,协调器本身也可以用作工作节点,因此最小可能的Citus集群是一个单独的服务器。

将单个coordinator作为入口点的好处在于,PG库和工具可以与Citus集群交互,就好像它是一个普通的PG服务器。由于分布式查询的开销与查询执行相比较小,一个大型协调器节点可以处理每秒数十万次的事务或通过PG的COPY命令每秒摄入数百万行的数据。

在这种架构下,很显然 coordinator 节点很容易成为整个系统的瓶颈,Citus 也有相对应的解决方案,通过将元数据分发到 worker 节点,Citus 能够降低coordinator 的查询压力,coordinator则只负责 DDL 语句的执行,由于 DDL 语句的执行量很少,这几乎不会导致coordinator 的瓶颈问题。然而,这种方案会导致 client 端连接 citus 集群时创建更多的连接,大量的连接则会导致另一个瓶颈。

Citus 的表类型

Citus 有两种表,分布表和引用表。在创建普通的 PG 本地表之后,我们可以通过执行 Citus 定义的方法来将本地表转换为 Citus 表,之后,Citus 会接管所有和这个表相关的操作。 通过真分区键进行哈希,Citus 可以将表上的数据均匀分布到 worker 节点上。Citus 可以保证相同范围的哈希值对应的数据都在相同的 worker 节点上,因此,在执行 join 等操作时,citus 不需要进行跨节点通信。

分布表和引用表的区别在于,分布表的数据通过哈希分片到不同 worker 节点上,但引用表会在所有节点上复制同步。这样,当分布表和引用表发生 join 时,worker 只需要执行针对本地的分片来执行join 。

数据再平衡

可以想象,当单个 worker 节点的数据达到负载极限时,我们需要将数据移动到新的节点,从而实现整体的负载均衡。 Citus 提供一个 rebalancer 来执行移动数据的操作,在执行 rebalance 操作时,rebalancer 会选取一个分片和与之相关的其他数据,通过 PG 的逻辑复制进行移动,这时,分片依然可以接收读写请求。在完成所有存量和增量的复制后,Citus 会对分片加上写锁来等待所有复制完成,并执行分布表的元数据更改。这时一般会有几秒的写宕机。

分布式查询计划器

在客户端请求查询 Citus 相关的数据表时,Citus 会生成一个包含有分布式查询计划的 Custom-Scan节点,Citus 有不同的查询计划器,以应对不同场景下的查询请求。如下图所示:

Alt text

Fast path planner 快速路径规划器处理的是针对单个表的简单CRUD查询,并且该表只有一个分布列的取值。它直接从查询中的过滤条件中提取分布列的值,并确定与该值匹配的分片。然后,规划器将表名重写为分片名,以构建在工作节点上运行的查询,这可以在CPU开销极小的情况下完成。因此,快速路径规划器支持高吞吐量的CRUD工作负载。

Router planner 路由规划器处理的是可以限定在一组共同分布的分片上的任意复杂查询。路由规划器会检查或推断所有分布表是否具有相同的分布列过滤条件。如果是这样,查询中的表名将被重写为与分布列值匹配的共同分布分片的名称。路由规划器隐式支持PostgreSQL支持的所有SQL特性,因为它会将完整的查询委托给另一个PostgreSQL服务器。因此,路由规划器使多租户SaaS应用能够在最小开销下使用所有SQL特性。

Logical planner 逻辑规划器通过构建多关系代数树来处理跨分片的查询。多关系代数形式化了两个在PostgreSQL中不可用的分布式执行原语,用于收集和重新分区数据。这种差异影响了路由规划器和逻辑规划器之间的分离。

逻辑规划器的目标是在结果在coordinator上合并之前,将尽可能多的计算推送到工作节点。这里有两种不同的逻辑规划策略:

  1. 逻辑下推规划器:检测连接树是否可以完全下推。这要求所有分布表之间具有共同分布的连接,并且子查询不需要全局合并步骤(例如,GROUP BY必须包含分布列)。如果是这样,规划器可以基本上不关心连接树中使用的SQL构造,因为它们完全委托给工作节点,分布式查询计划变得非常容易并行化。
  2. 逻辑连接顺序规划器:确定涉及非共同分布连接的连接树的最佳执行顺序。它使用共同分布连接、广播连接和重新分区连接来评估分布表和子查询之间所有可能的连接顺序,并选择最小化网络流量的顺序。广播连接和重新分区连接会导致带有过滤器和投影的子计划被推送到子计划中。

对于每个查询,Citus会按照最低到最高开销的顺序遍历这四个规划器。如果某个特定规划器可以为查询生成计划,Citus就会使用它。在特定场景下,相比执行而言,查询计划的生成在时间开销上很低。

分布式查询执行器

PG 的查询计划是一个由多个执行节点组成的执行树,每个节点都有一个返回一个元组的函数。 Citus 生成的 CustomScan 就是其中的一个节点。 PG 的执行器进入 CustomScan函数后,Citus 会执行各个子计划,然后把执行移交给 adaptive executor自适应执行器。自适应执行器可以通过单工作节点多路连接的方式并行执行查询任务。这种多路实现的方式需要管理过程中建立连接,以及并行处理过程中的额外开销。

自适应执行器需要权衡并行执行的延迟与各类开销。这里提出了一个“slow start”慢启动的方案。查询开始时,执行器对每个工作节点只建立一个连接,接下来,每 10ms,执行器会给每个工作节点的连接数(n)加一。如果有 t 个等待交给某个工作节点执行的任务没有被分配到可用的连接, 那么执行器就会为这个工作节点创建 min(n, t) 个连接,并放入连接池中。这种方案的原因在于,一个简单的内存中基于索引的查询往往只需要不到 1ms 的时间,所以一般来说节点上的所有任务会在执行器尝试打开连接之前完成。此外,分析型任务往往需要数百毫秒,尽管连接建立有一定的延迟,但是在整体的时间开销里几乎可以忽略。当然,这种方案下,执行器仍然需要管理与各个节点的连接数。

在执行连接上的任务分配时,由于每个连接到分片上执行查询时访问的是不同的数据,并在多语句事务的情况下保持未提交的写入和锁定。因此,对于每个连接,Citus会跟踪已访问的分片,以确保相同的连接将在同一事务中对同一组共同分布的分片进行任何后续访问。在开始执行语句时,如果在事务中已经访问了分片,则将任务分配给对应的连接,否则将其分配给工作节点的通用池。当连接准备就绪时,执行器首先从其队列中获取一个已分配的任务,否则从通用池中获取任务。

通过结合慢启动、共享连接限制和任务分配算法,自适应执行器可以处理各种工作负载模式,即使它们在单个数据库集群上并发运行,并支持复杂的交互式事务块而不损失并行性。

分布式事务

Citus 中的事务主要分两种,一种是在 coordinator 上的事务,一种是在工作节点上的事务。对于仅仅涉及单个工作节点的事务,工作节点全权负责整个事务;对于涉及多个节点的事务则通过两阶段提交来保证原子性。单节点事务其实比较简单,我们主要看看多节点的事务如何基于两阶段提交来实现。

对于涉及多个节点的写事务,执行器在工作节点上开启事务块,并在提交时对它们执行两阶段提交(2PC)。PostgreSQL实现了准备事务状态的命令,以一种保留锁并在重新启动和恢复时保留状态的方式。这使得稍后可以提交或中止已准备好的事务。Citus使用这些命令来实现完整的2PC协议。

当协调器上的事务即将提交时,预提交回调通过所有与开启事务块的工作节点的连接发送“准备事务”命令。如果成功,协调器为每个已准备好的事务在Citus元数据中写入一个提交记录,然后本地事务提交,确保提交记录被耐久存储。在提交后和中止回调中,已准备好的事务将尽力提交或中止。

当一个或多个已准备好的事务无法提交或中止时,将使用Citus元数据中的提交记录确定事务的结果。后台守护进程定期比较每个工作节点上待处理的准备好的事务列表和本地的提交记录。如果存在已准备好的事务的提交记录(即:可见),则协调器已经提交,因此已准备好的事务也必须提交。反之,如果一个已结束的事务没有记录存在,那么已准备好的事务必须中止。当存在多个协调器时,每个协调器为其启动的事务执行2PC恢复。由于提交记录和已准备好的事务都存储在预写式日志中,这种方法对涉及的任何节点的故障是强大的。

另一个关键点在于如何处理分布式死锁,特别是在多语句事务之间。为了解决这个问题,可以使用死锁预防或死锁检测方法。死锁预防技术(例如Wound-Wait)需要一定百分比的事务重新启动。PostgreSQL具有交互式协议,这意味着在重新启动发生之前可能将结果返回给客户端,而且不希望客户端重试事务。因此,Wound-Wait对于Citus来说不太适用。为了保持与PostgreSQL的兼容性,Citus实现了分布式死锁检测,当事务陷入实际死锁时,会中止事务。

PostgreSQL已经在单节点上提供了死锁检测。Citus通过在协调器节点上运行的后台守护程序扩展了这一逻辑。该守护程序每2秒轮询所有工作节点,以获取其锁图中的边缘(进程a等待进程b),然后合并在同一分布式事务中参与的图中的所有进程。如果生成的图包含一个环路,那么将向属于环路中最年轻的分布式事务的进程发送取消命令,以中止事务。

除非存在实际死锁,否则在典型的(分布式)数据库工作负载中,只有少数事务会在等待锁。因此,分布式死锁检测的开销很小。当分布式死锁经常发生时,建议用户更改其事务中语句的顺序。

Citus中的多节点事务提供了原子性、一致性和持久性的保证,但不提供分布式快照隔离的保证。并发的多节点查询可能在一个节点上提交之前获取本地MVCC快照,而在另一个节点上提交之后获取快照。解决这个问题需要对PostgreSQL进行更改,使快照管理器可扩展。在实践中,我们在这四种工作负载模式中并没有发现对分布式快照隔离的强烈需求,客户目前也没有表达对此的需求。在多租户和CRUD应用程序中,大多数事务范围仅限于单个节点,这意味着它们在该节点上获得了隔离保证。分析应用程序之间的事务没有强依赖关系,因此对宽松的保证更具宽容性。

在某些混合场景中,分布式快照隔离可能很重要。然而,现有的分布式快照隔离技术由于需要额外的网络往返或等待时钟而具有显著的性能成本,这会增加响应时间并降低可实现的吞吐量。在同步的PostgreSQL协议的背景下,吞吐量最终受到#连接数/响应时间的限制。由于从应用程序的角度来看,建立大量的数据库连接通常是不切实际的,因此低响应时间是实现高吞吐量的唯一途径。因此,如果将来实施分布式快照隔离,我们可能会将其作为可选项。

其他场景的分布式处理

除了简单的 SELECT 语句和其他 DML 命令外,Citus 还提供其他语句的支持

  • DDL 命令在 PG 里面是在线处理的具有事务性的操作,Citus 同样通过加锁来保持相同的特性,并通过执行器将命令发送到 worker 节点
  • COPY 命令在 PG 里面可以被用来导入 CSV 格式的数据,这个过程在 PG 里面是单线程的实现,并需要更新索引、检查各类约束条件。在 citus 里面,coordinator 会在每一个分片和数据流上异步开启 COPY命令,这样写操作也可以并行执行。
  • 跨分布表的INSERT…SELECT命令往往采用以下三种步骤之一去执行:
    • 如果SELECT操作在协调器上需要执行合并步骤,则该命令在内部作为分布式SELECT执行,然后将结果COPY到目标表中。
    • 如果没有合并步骤,但源表和目标表不是共位的,则INSERT..SELECT在将SELECT结果插入目标表之前执行分布式重新分区。
    • 如果源表和目标表是共位的,则INSERT..SELECT直接在并行的共位分片上执行。
  • 在Citus中,存储过程可以基于分布参数和一个共位的分布表被委托给工作节点,以避免协调器和工作节点之间的网络往返。工作节点可以在不进行网络往返的情况下在本地执行大多数操作,但在必要时也可以在工作节点之间执行分布式事务。这种方法有助于在分布式环境中优化存储过程的性能。

高可用和备份

在Citus中,HA主要在服务器层使用现有的PostgreSQL复制进行处理。在HA设置中,集群中的每个节点都有一个或多个热备份节点,并使用同步、异步或quorum来复制其写前日志(WAL)。当一个节点失败时,集群协调器会提升一个备用节点,并更新Citus元数据、DNS记录或虚拟IP。整个故障切换过程需要20-30秒,在此期间涉及该节点的分布式事务会回滚。coordinator通常是托管服务中的控制面的一部分,但本地用户可以使用pg_auto_failover扩展来执行相同的功能。

备份也主要在服务器级别进行,通过创建周期性的磁盘快照或数据库目录的副本,并在每个服务器中将WAL持续存档到远程存储来实现。Citus支持定期创建一致的还原点,即每个节点的WAL记录。还原点是在将写操作阻塞到 coordinator上的提交记录表时创建的,这可以防止在创建还原点时进行中的两阶段提交。将所有服务器还原到相同的还原点可以保证在恢复的集群中,所有多节点事务要么完全提交要么中止,或者可以通过协调器在启动时执行2PC恢复来完成。

性能测试

Benchmark 部分,作者的实验基本围绕着不同场景下 PG 单机和 Citus 不同部署模型下的性能差距,包括 latency 、 QPS 、 TPS 等等。这里我就不详细展开了,贴几张图,感兴趣的同学可以直接看原文。

![[github_benchmark.png]]
![[data_warehouse_benchmark.png]]
![[tps_benchmark.png]]
![[ycsb_benchmark.png]]

应用案例

这部分介绍了 Citus 在微软内部的使用,这一场景主要是一个数据分析场景,数据来自全球数亿台 windows 设备,指标显示在一个名为“Release Quality View”(RQV)的实时分析仪表板上,该仪表板帮助Windows工程团队评估每个Windows版本的客户体验质量。

RQV的底层数据存储,代号为VeniceDB,由两个在Microsoft Azure上运行的超过1000核心的Citus集群提供支持,存储了超过一PB的数据。虽然对于VeniceDB评估了许多不同的分布式数据库和数据处理系统,但只有Citus能够满足与PB级VeniceDB工作负载相关的特定要求,包括:

  • 对于每天超过6百万次查询,p95下的小于一秒的响应时间
  • 每天约10TB的新数据
  • 在RQV中显示新的数据需要在20分钟内完成
  • 具有高基数group by的嵌套子查询
  • 高级二级索引(例如部分索引、GiST索引)以高效查找沿各个维度的报告
  • 高级数据类型(例如数组、HyperLogLog)以在SQL中实现复杂的分析算法
  • 通过增量聚合减少行数
  • 在节点间进行原子更新以清理错误数据

在Citus集群中,原始数据存储在名为measures的表中,该表按设备ID进行分布,并使用PostgreSQL中内置的分区功能按时间进行磁盘分区。使用COPY命令来并行化将传入的JSON数据导入分布式表。使用分布式INSERT..SELECT命令来执行设备级别的对传入数据进行预聚合,并将其放入几个具有不同索引的reports表中。reports表也按设备ID进行分布,并与measures表共位,以便Citus可以完全并行化INSERT..SELECT。

这里给出了一个典型的查询语句:

1
2
3
4
5
6
SELECT ..., avg(device_avg)
FROM (
SELECT deviceid, ..., avg(metric) as device_avg
FROM reports WHERE ...
GROUP BY deviceid, <time period> , <other dimensions> ) AS subq
GROUP BY <time period>, <other dimensions>;

这些查询通过多个维度进行过滤(例如测量、时间范围、Windows版本),以找到数据的重要子集。嵌套的子查询首先通过设备ID对报告进行聚合,这对于按设备而不是按报告数量来衡量整体平均值是必要的。每个查询可能涉及数千万台设备,这使得按deviceid进行GROUP BY的计算变得具有挑战性。由于子查询按照分布列进行分组,Citus中的逻辑推送计划器认识到它可以将整个子查询推送到所有工作节点以进行并行化。然后,工作节点使用仅索引扫描按设备ID顺序读取数据,并最小化GROUP BY的磁盘I/O和内存占用。最后,Citus通过在工作节点上计算部分聚合并在协调器上合并这些部分聚合来分发外部聚合步骤,以生成最终结果。

相关工作

这部分大体介绍了当前市面上常见的分布式数据库解决方案:

  • 针对MySQL的类似于 Citus 的方案Vitess,采取了和 Citus 相似的实现方案
  • 基于 PG 的解决方案 Greenplum and Redshift,相比 Citus 而言对分析场景具有更好的支持,比如采用列存来实现快速 scan,通过数据 shuffle 来优化 join 性能等
  • Aurora同样也对 PG 进行了支持,通过分布式存储的实现,Aurora 采用了存算分离、共享存储的方案,这种方案的好处在于调用端不需要做许多分布式场景下的决策,可以直接把 Aurora 当做单机 DB 来使用。 Citus 则需要调用方对分布方案有足够的理解和干预。
  • Spanner ,CockroachDB 和 Yugabyte 主要面向需要分布式事务支持的场景。CockroachDB 和 Yugabyte 也部分支持 PostgreSQL 协议。与 Citus 相比,这些系统的一个显著的架构差异在于它们提供了分布式快照隔离,并使用了”等待-等待”(wound-wait)而不是死锁检测。分布式快照隔离的一个优点是它避免了数据建模的约束。Citus 用户需要使用邻近数据存储和引用表,以将事务范围限制到单个节点,以获得完整的 ACID 保证。另一方面,这些技术还能实现高效的连接和外键,因此它们对于扩展复杂的关系数据库工作负载是至关重要的。
  • TimescaleDB 是一个为时间序列数据优化的 PostgreSQL 扩展。它使用与 Citus 相似的钩子来引入“超级表”(hypertable)的概念,该表会根据时间自动进行分区。按时间对表进行分区对于限制索引大小以保持时间序列工作负载的高写入性能,以及通过时间范围进行分区修剪以加速查询是有用的。由于对 PostgreSQL 钩子的冲突使用,目前 Citus 和 TimescaleDB 不兼容,但 Citus 可以与 pg_partman 一起使用,后者是一个更简单的时间分区扩展。许多使用 Citus 的实时分析应用程序也会在分布式表的基础上使用 pg_partman,在这种情况下,各个分片会被本地分区,以获得分布式表和时间分区的双重优势。

整体而言,Citus 的分布式解决方案需要用户直接介入数据切片、数据同步、数据存储等多个过程,对于使用者而言需要一定背景知识。这一实现方案的好处在于 Citus 可以快速发布支持最新版本 PG 的新版本。

一提到限流算法,大家肯定就是要聊时间窗口、漏斗桶、令牌桶之类的,今天跟着Azure的文档来看看限流器在生产环境怎么搭建和使用。

什么是Rate limiting?

假如现在有一个线上的服务,由于业务的增长或者调用方的不正确操作产生了大量的流量,如果我们直接拒绝过量的请求,那么会出现以下的情况:

以Azure Cosmos DB为例,假设我们有一个应用现在要尝试将数据写入Cosmos DB:

  1. 应用需要将10k条记录写入Cosmos DB,每条记录消耗10个请求单元,我们一共需要100k个请求单元来完成整个写入的工作
  2. 此时Cosmos DB仅有20k个请求单元可供服务
  3. 当10k的记录发送到DB时,由于负载容量有限,仅有2k条记录被成功写入,但8k条记录被拒绝
  4. 接着8k的记录被发送到DB,2k写入,6k拒绝
  5. 接着6k的记录被发送,2k写入,4k拒绝
  6. 接着4k的记录被发送,2k写入,2k拒绝
  7. 最后2k的记录被发送,全部记录写入完成

可以发现,应用总共发送了30k的记录,但是只有10k的数据被写入DB。这产生了大量额外的请求开销。

与此同时,由于大量请求被拒绝,应用中需要处理20k个错误信息,这也会带来内存、存储等多方面的开销。

由于不知道DB端的限流策略,这种单纯重试的方式也无法给调用方一个预期多久可以完成所有数据的写入。

因此我们需要速率限制器,帮助我们控制一定时间周期内发送到我们服务中的请求数量。

如何来做请求速率限制器

我们可以基于不同的指标来决定如何限制请求的速率:

  • 可以是请求的数量,比如每秒20个请求
  • 可以是数据的大小,比如每分钟3个GB
  • 也可以是操作的开销,比如每秒钟20k个请求单元

以上指标都可以用来实现速率控制器。在我们日常的场景里,往往是应用端会接收到大量的请求,因此如何使用负载有限的服务端是应用端必须解决的问题,我们可以简单地将请求缓存在本地,但这也会有进程崩溃导致请求失败的可能性。为了避免这类风险,我们可以把请求放在一个可靠的消息队列服务中,由作业执行器来以一定的速率来读取请求并发送到后端服务,这样就实现了一个速率控制器,提交请求变成了将请求写入消息队列,作业执行器则只会在能够处理请求的时候将请求从消息队列取出。

Alt text

当你发送记录时,你用于释放记录的时间周期可能比服务进行限流的周期更加精细。系统通常根据你可以轻松理解和处理的时间段来设置限流。然而,对于运行服务的计算机来说,这些时间跨度可能相对较长,与其处理信息的速度相比。例如,系统可能以每秒或每分钟为单位进行限流,但通常代码处理的时间单位是纳秒或毫秒。

虽然不是必需的,但通常建议更频繁地发送少量记录以提高吞吐量。因此,你可以比每秒或每分钟批量释放记录更加精细地处理,以保持你的资源消耗(内存、CPU、网络等)以更均匀的速率进行,从而防止由于突发请求造成的潜在瓶颈。例如,如果一个服务允许每秒处理100次操作,速率限制器的实现可以通过每200毫秒释放20次操作来平衡请求,如下图所示。

Alt text

此外,有时需要让多个不协调的进程共享一个受限制的服务。为了在这种情况下实施速率限制,你可以逻辑上将服务的容量分区,然后使用分布式互斥系统在这些分区上管理独占锁。不协调的进程可以在需要容量时竞争这些分区上的锁。对于进程持有锁的每个分区,它被授予一定数量的容量。

例如,如果受限制的系统允许每秒500个请求,你可以创建20个分区,每个分区允许每秒25个请求。如果一个进程需要发出100个请求,它可能请求分布式互斥系统的四个分区。系统可能授予两个分区的独占锁,持续10秒。然后,该进程的速率限制将为每秒50个请求,任务将在两秒内完成,然后释放锁。

实现这种模式的一种方法是使用Azure Storage。在这种情况下,你可以在容器中为每个逻辑分区创建一个0字节的blob。然后,你的应用程序可以直接针对这些blob获取短时间(例如15秒)的独占租约。对于每个获得的租约,应用程序将能够使用该分区的容量。应用程序随后需要跟踪租约时间,以便在租约到期时停止使用其被授予的容量。在实现此模式时,通常希望每个进程在需要容量时尝试租用一个随机分区,以进一步降低延迟,你可能会为每个进程分配一小部分独占容量。因此,只有在需要超出其保留容量时,进程才会尝试获得对共享容量的租约。

Alt text

在决定如何实施这种模式时,请考虑以下因素:

  1. 尽管速率限制模式可以减少限制错误的数量,但你的应用程序仍然需要正确处理可能发生的任何限制错误。

  2. 如果你的应用程序有多个工作流程访问同一个受限制的服务,你需要将它们全部整合到你的速率限制策略中。例如,你可能支持将记录批量加载到数据库中,同时也支持在同一数据库中查询记录。你可以通过确保所有工作流程都通过相同的速率限制机制进行控制,来管理容量。或者,你可以为每个工作流程保留单独的容量池。

  3. 受限制的服务可能被多个应用程序使用。在某些情况下,可以协调这种使用(如上所示)。如果你开始看到比预期更多的限制错误,这可能是多个应用程序访问服务之间的竞争的迹象。如果是这样,你可能需要考虑在其他应用程序的使用降低之前,暂时减少你的速率限制机制施加的吞吐量。

使用这种模式可以达到以下目的:

  1. 减少受限制服务引发的限制错误: 通过合理的速率限制策略,可以降低受限制服务引发的限制错误。逻辑上将服务的容量分区,并使用分布式互斥系统管理这些分区上的独占锁,可以确保请求在可用的容量内得到处理,从而减少限制错误的发生。

  2. 减少与简单错误重试方法相比的流量: 与简单的错误重试方法相比,使用速率限制模式可以减少传输的数据量。通过根据可用容量调整请求的发送频率,可以避免将大量请求同时发送到服务,从而降低了网络流量。

  3. 减少内存消耗: 速率限制模式允许在有处理能力的情况下再将记录出队,从而降低了内存的使用。只有当系统有足够的处理能力来处理记录时,才将它们从队列中移除,避免了在内存中保存大量未处理的记录,节省了内存资源。

Reference

[1] https://learn.microsoft.com/en-us/azure/architecture/patterns/rate-limiting-pattern

最近准备搞一些自建数据库相关的项目,偶然看到 TiDB的文档里面有介绍如何使用 Key-Value(键值对)来存储关系型数据库的表数据,我们跟着文档来过一遍这里的设计,也可以为我们自己的实现提供一些参考。

阅读全文 »

这篇是上周五 B 站和 deeplus 技术分享的第一个 session,直播的时候我正在上班,基本全程错过,今晚趁这个时间把第一部分先看一下,我们一步一步来。

  • 离线平台整体架构介绍
  • 存储架构改造
  • 多机房架构建设
  • 资源混部建设
  • 计算引擎改造
  • 未来展望和思考

p.s. 这是在看完视频之后第三遍回顾整个分享的内容,其实分享中的细节很少,但是信息量很大,整个分享基本上涵盖了过去几年 b 站大数据平台的技术演进过程,其中一部分利用了社区的工作,一部分做了定制化的改造或者优化。可惜我本人不是做这个方向,不过很多东西也能够印证我自己工作领域的一些思路。

阅读全文 »

为什么需要分布式锁?

在单机环境下编写多线程程序时,为了避免多个线程同时操作同一个资源,我们往往会通过加锁来实现互斥,以保证同一时间只有一个操作者对某个资源执行操作,在单机多进程的情况下,如果们想操作同一个共享资源,我们也可以通过操作系统提供的文件锁和心好凉来实现互斥,这些都是单台机器上的操作。而在分布式环境下,如果不同机器上的不同进程需要同时操作某一个共享资源,我们同样也需要这样一个统一的锁来实现互斥。这个时候,我们就需要一个平台来提供这样一个互斥的能力,通常我们会采用一些能够提供一致性的服务,比如 ZooKeeper、 etcd 来满足对一致性要求较高的场景下的互斥需求,当然,也有些服务会用数据库,比如 MySQL,来实现互斥,然而在某些高并发业务场景下,我们通常会采用 Redis来实现。

阅读全文 »