内存不足?没问题。DuckDB 中的外部聚合

Author Avatar
Laurens Kuiper
2024-03-29 · 17 分钟

TL;DR: 自 0.9.0 版本发布以来,DuckDB 的完全并行聚合哈希表能够高效地聚合比内存能容纳的更多组。

大多数分组聚合查询只产生少量输出行。例如,“过去十年中从每个欧洲首都出发的航班数量?”只会为每个欧洲首都产生一行,即使包含所有航班信息的表有数百万行。但情况并非总是如此,例如“过去十年中每位客户的订单数量?”会为每位客户产生一行,这可能达到数百万行,从而显著增加查询的内存消耗。然而,即使聚合无法完全适应内存,DuckDB 仍然可以完成查询。

对实现不感兴趣?直接跳到实验部分!

介绍

大约两年前,我们发布了关于 DuckDB 哈希聚合的第一篇博文,题为“DuckDB 中的并行分组聚合”。那么我们现在为什么要写另一篇博文呢?

与大多数数据库系统是服务器不同,DuckDB 在各种可能内存不多的环境中被使用。然而,有些数据库查询,例如具有许多唯一组的聚合,需要大量内存。我正在用这台笔记本电脑写作,它有 16 GB 的 RAM。如果一个查询需要 20 GB 怎么办?如果发生这种情况

Out of Memory Error: could not allocate block of size X (Y/Z used)

查询将被中止。遗憾的是,我们无法下载更多 RAM。但幸运的是,这台笔记本电脑还有一个 1 TB 存储空间的快速 SSD。在许多情况下,我们不需要同时将所有 20 GB 数据都放入内存,我们可以暂时将一些数据放入存储中。如果需要时再加载回来,我们仍然可以完成查询。我们必须注意节约使用存储,因为尽管现代 SSD 速度很快,但它们仍然比内存慢得多。

简而言之,这就是这篇文章要讲的内容。自 0.9.0 版本发布以来,DuckDB 的哈希聚合可以通过将数据卸载到存储来处理比内存能容纳的更多唯一组。在这篇文章中,我们将解释其工作原理。如果你想了解哈希聚合是什么、哈希冲突是如何解决的,或者 DuckDB 的哈希表是如何构建的,请查看我们关于哈希聚合的第一篇博文

内存管理

大多数数据库系统将持久数据存储在“页”上。根据请求,这些页可以从存储中的数据库文件中读取,放入内存,并在必要时再写回。普遍的经验法则是使所有页大小相同:这允许页交换,并避免内存和存储中的碎片化。数据库启动时,会分配并保留一部分内存用于这些页,称为“缓冲区池”。负责管理缓冲区池的数据库组件恰当地称为“缓冲区管理器”。

剩余内存保留用于短期存在,即临时内存分配,例如用于聚合的哈希表。这些分配方式不同,这是好的,因为如果有很多唯一组,哈希表可能需要非常大,所以我们无论如何也无法为此使用固定大小的页。如果我们的临时数据比内存能容纳的更多,聚合等操作符必须决定何时有选择地将数据写入存储中的临时文件

……至少,这是传统做法。这对 DuckDB 来说意义不大。我们为什么要如此不同地管理持久数据和临时数据呢?区别在于持久数据应该被持久化,而临时数据不应该。为什么缓冲区管理器不能两者都管理呢?

DuckDB 的缓冲区管理器并非传统。大多数持久数据和临时数据都存储在固定大小的页上,并由缓冲区管理器管理。缓冲区管理器会努力最大限度地利用你的内存。这意味着我们不会为缓冲区池保留一部分内存。这使得 DuckDB 可以将所有内存用于持久数据,如果这对你的工作负载最有利,而不是只用一部分。如果你正在进行需要大量内存的大型聚合,DuckDB 可以将持久数据从内存中逐出,为大型哈希表腾出空间。

因为 DuckDB 的缓冲区管理器管理所有内存,包括持久数据和临时数据,它在选择何时将临时数据写入存储方面比聚合等操作符做得好得多。将卸载的责任留给缓冲区管理器,也省去了我们在每个需要处理不适合内存的数据的操作符中实现读写数据到临时文件的精力。

为什么其他数据库系统中的缓冲区管理器不管理临时数据?存在两个问题:内存碎片无效引用

内存碎片

哈希表和查询操作符中使用的其他数据结构不像用于持久数据的页面那样具有固定的尺寸。我们也不希望在内存中同时存在许多可变大小的页面和固定大小的页面,因为这会导致内存碎片。

理想情况下,我们应该对所有内存分配都使用固定大小,但这并不是一个好主意:有时,最有效的查询处理方式需要分配,例如,一个大型数组。所以,我们决定对几乎所有分配都使用固定大小。这些短暂的分配在使用后会立即解除分配,不像用于持久数据的固定大小页面那样被保留。这些分配不会相互导致碎片化,因为 DuckDB 在可能的情况下使用 jemalloc 进行内存分配,jemalloc 通过大小分类对分配进行分类,并为其维护独立的竞技场。

无效引用

临时数据通常不能按原样写入存储,因为它通常包含指针。例如,DuckDB 实现了 Umbra 提出的字符串类型,它具有固定宽度。长度超过 12 个字符的字符串不存储在字符串类型内部,而是存储在其他地方,并存储一个指向这个“其他地方”的指针。

当我们想要将数据卸载到存储时,这会产生一个问题。假设那些存储长度超过 12 个字符的字符串的“其他地方”是缓冲区管理器可以随时卸载到存储以释放内存的页面之一。如果该页面被卸载然后重新加载,它很可能会被加载到内存中的不同地址。那些指向长字符串的指针现在是无效的,因为它们仍然指向以前的地址!

将包含指针的数据写入存储的通常方法是先进行序列化。当将其读回内存时,必须再次进行反序列化(反)序列化可能是一个昂贵的操作,因此存在像 Arrow Flight 这样的数据格式,它们试图将成本降到最低。然而,我们不能在这里使用 Arrow,因为 Arrow 是列式布局,而行式布局对哈希表更有效

我们可以创建 Arrow Flight 的行式版本,但我们可以完全避免(反)序列化:我们创建了一种专门的行式页面布局,它实际上使用旧的无效指针在数据读回内存后重新计算新的有效指针。

页布局将固定大小的行和可变大小的数据(如字符串)放置在不同的页上。行的尺寸在查询中是固定的:发出 SQL 查询后,DuckDB 会创建并执行查询计划。因此,甚至在执行所述计划之前,我们已经知道我们需要哪些列、它们的类型以及这些类型的宽度。

如下图所示,需要少量“元数据”来重新计算指针。固定大小的行存储在“行页”中,可变大小的行存储在“变量页”中。

DuckDB's spillable page layout

请记住,固定大小的行内有指向可变大小数据的指针。元数据描述了哪些固定大小的行指向哪个变量页,以及该变量页的最后已知地址。例如,元数据 1 描述了存储在行页 1 偏移量 0 处的 5 行,其可变大小数据存储在变量页 1 中,该页的地址是 0x42

假设缓冲区管理器决定卸载变量页 1。当我们再次请求变量页 1 时,它被加载到地址 0x500。那 5 行中的指针现在是无效的。例如,其中一行包含指针 0x48,这意味着它存储在变量页 1 的偏移量 0x48 - 0x42 = 6 处。我们可以通过将偏移量添加到页的新地址来重新计算指针:0x500 + 6 = 0x506。指针重新计算是针对其字符串存储在相同行页和变量页的行进行的,因此每当行页或变量页满时,我们都会创建一个新的元数据。

指针重新计算相对于(反)序列化的优点是它可以延迟执行。我们可以通过比较元数据中的指针与页面的当前指针来检查变量页是否被卸载。如果它们相同,我们就不必重新计算指针。

外部聚合

既然我们已经解决了如何处理临时数据的问题,现在终于可以谈谈哈希聚合了。第一个巨大的挑战是如何并行执行聚合。

DuckDB 使用 Morsel-Driven Parallelism 来并行化查询执行,这本质上意味着查询操作符(例如聚合)必须感知并行性。这与计划驱动并行性不同,后者让操作符不感知并行性。

简要总结一下我们关于聚合的第一篇博文:在 DuckDB 中,所有活动线程都有自己的线程局部哈希表,它们将输入数据沉入其中。这将使线程一直忙碌,直到所有输入数据都被读取。多个线程很可能在它们的哈希表中拥有完全相同的组。因此,必须合并线程局部哈希表以完成分组聚合。这可以通过分区哈希表并将每个线程分配到合并每个分区的数据来并行完成。在大多数情况下,我们仍然使用相同的方法。你将在下图中看到这一点,它说明了我们的新实现。

DuckDB's external hash aggregation

我们将第一阶段称为线程本地预聚合。输入数据是数据块,大约 100,000 行的块。这些数据块被分配给活动线程,线程将其沉入其线程局部哈希表,直到所有输入数据都被读取。我们使用线性探测来解决冲突,并使用盐值来减少处理冲突的开销。这在我们关于聚合的第一篇博文中有所解释,所以这里不再赘述。

现在我们已经解释了哪些地方没有改变,我们可以谈谈哪些地方已经改变了。与上次相比,第一个不同是我们分区的方式。以前,如果例如有 32 个线程,每个线程会创建 32 个哈希表,每个分区一个。这总共是惊人的 1024 个哈希表,在更多线程活跃时扩展性不佳。现在,每个线程有一个哈希表,但每个哈希表内的数据是分区的。数据也存储在我们之前介绍的专用页面布局上,以便可以轻松地卸载到存储。

第二个区别是哈希表在线程本地预聚合期间不会调整大小。我们保持哈希表的大小较小,从而减少此阶段的缓存未命中次数。这意味着哈希表会在某个时候满载。当它满载时,我们将其重置并重新开始。我们可以这样做,因为我们将在第二阶段完成聚合。当我们重置哈希表时,我们会“解除固定”存储实际数据的页面,这会告诉我们的缓冲区管理器,当它需要释放内存时,可以将它们写入存储。

这两个变化共同导致了第一阶段对内存的低需求。每个线程只需要在内存中保留一个小的哈希表。我们可能通过多次填充哈希表来收集大量数据,但如果需要,缓冲区管理器几乎可以卸载所有这些数据。

对于第二阶段,即分区聚合,线程本地分区数据进行交换,每个线程将单个分区的数据合并到一个哈希表中。此阶段与之前基本相同,只是我们现在有时创建的分区数量比线程数多得多。为什么?一个分区的哈希表可能适合内存,但 8 个线程可能同时合并一个分区,我们可能无法在内存中容纳 8 个分区。解决这个问题的简单方法是过度分区。如果我们创建比线程更多的分区,例如 32 个分区,那么分区的大小会更小,并且 8 个线程将只同时合并 32 个分区中的 8 个,这不会需要几乎那么多的内存。

实验

只产生少数唯一组的聚合可以轻松地适应内存。为了评估我们的外部哈希聚合实现,我们需要具有许多唯一组的聚合。为此,我们将使用 H2O.ai 类似数据库操作的基准测试,我们已将其复活,现在正在维护。具体来说,我们将使用 G1_1e9_2e0_0_0.csv.zst 文件,该文件未压缩时为 50 GB。H2O.ai 基准测试的源代码可以在 GitHub 上找到。你可以从 https://blobs.duckdb.org/data/G1_1e9_2e0_0_0.csv.zst 下载该文件(压缩后为 18.8 GB)。

我们使用基准测试中的以下查询来加载数据

SET preserve_insertion_order = false;
CREATE TABLE y (
    id1 VARCHAR, id2 VARCHAR, id3 VARCHAR,
    id4 INTEGER, id5 INTEGER, id6 INTEGER,
    v1 INTEGER, v2 INTEGER, v3 FLOAT);
COPY y FROM 'G1_1e9_2e0_0_0.csv.zst' (FORMAT csv, AUTO_DETECT true);
CREATE TYPE id1ENUM AS ENUM (SELECT id1 FROM y);
CREATE TYPE id2ENUM AS ENUM (SELECT id2 FROM y);
CREATE TABLE x (
    id1 id1ENUM, id2 id2ENUM, id3 VARCHAR,
    id4 INTEGER, id5 INTEGER, id6 INTEGER,
    v1 INTEGER, v2 INTEGER, v3 FLOAT);
INSERT INTO x (SELECT * FROM y);
DROP TABLE IF EXISTS y;

H2O.ai 聚合基准测试包含 10 个查询,它们在唯一组的数量上有所不同

-- Query 1: ~100 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id1, sum(v1) AS v1
FROM x
GROUP BY id1;
-- Query 2: ~10,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id1, id2, sum(v1) AS v1
FROM x
GROUP BY id1, id2;
-- Query 3: ~10,000,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id3, sum(v1) AS v1, avg(v3) AS v3
FROM x
GROUP BY id3;
-- Query 4: ~100 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id4, avg(v1) AS v1, avg(v2) AS v2, avg(v3) AS v3
FROM x
GROUP BY id4;
-- Query 5: ~1,000,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id6, sum(v1) AS v1, sum(v2) AS v2, sum(v3) AS v3
FROM x
GROUP BY id6;
-- Query 6: ~10,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT
    id4,
    id5,
    quantile_cont(v3, 0.5) AS median_v3,
    stddev(v3) AS sd_v3
FROM x
GROUP BY id4, id5;
-- Query 7: ~10,000,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id3, max(v1) - min(v2) AS range_v1_v2
FROM x
GROUP BY id3;
-- Query 8: ~10,000,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id6, v3 AS largest2_v3
FROM (
    SELECT id6, v3, row_number() OVER (
          PARTITION BY id6
          ORDER BY v3 DESC) AS order_v3
    FROM x
    WHERE v3 IS NOT NULL) sub_query
WHERE order_v3 <= 2;
-- Query 9: ~10,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id2, id4, pow(corr(v1, v2), 2) AS r2
FROM x
GROUP BY id2, id4;
-- Query 10: ~1,000,000,000 unique groups
CREATE OR REPLACE TABLE ans AS
SELECT id1, id2, id3, id4, id5, id6, sum(v3) AS v3, count(*) AS count
FROM x
GROUP BY id1, id2, id3, id4, id5, id6;

基准测试页面上的结果是使用 c6id.metal AWS EC2 实例获得的。在此实例上,所有查询都可以轻松地适应内存,并且拥有许多线程也不会损害性能。DuckDB 仅需 8.58 秒即可完成最大的查询(查询 10),该查询返回 10 亿个唯一组。然而,许多人不会使用如此强大的机器来处理数据。在我的笔记本电脑上(一台 2020 款 MacBook Pro),一些较小的查询(如查询 1)可以适应内存,但查询 10 肯定不能。

下表是所用硬件的摘要。

规格 c6id.metal 笔记本电脑 比率
内存 256 GB 16 GB 16 倍
CPU 核心数 64 8 8 倍
CPU 线程数 128 8 16 倍
每小时成本 $6.45 $0.00

尽管 AWS EC2 实例的 CPU 核心与我的笔记本电脑的 CPU 核心不能直接比较,但该实例显然拥有更多的计算能力和内存。尽管硬件差异巨大,DuckDB 仍然可以毫无问题地完成所有 10 个查询

查询 c6id.metal 笔记本电脑 比率
1 0.08 0.74 9.25 倍
2 0.09 0.76 8.44 倍
3 8.01 156.63 19.55 倍
4 0.26 2.07 7.96 倍
5 6.72 145.00 21.58 倍
6 17.12 19.28 1.13 倍
7 6.33 124.85 19.72 倍
8 6.53 126.35 19.35 倍
9 0.32 1.90 5.94 倍
10 8.58 264.14 30.79 倍

查询的运行时间以秒为单位报告,是通过在我的笔记本电脑上使用 DuckDB 0.10.1 运行 3 次取中位数获得的。c6id.metal 实例的结果取自基准测试网站。尽管我的笔记本电脑的内存无法容纳所有唯一组,但 DuckDB 能够计算所有唯一组并返回它们。最大的查询(查询 10)需要将近 4.5 分钟才能完成。这比使用强大的 c6id.metal 实例慢 30 多倍。巨大的差异当然可以用硬件的巨大差异来解释。有趣的是,这仍然比 c6id.metal 实例上的 Spark 快,Spark 需要 603.05 秒!

结论

DuckDB 正在不断改进其大于内存的查询处理能力。在这篇博文中,我们展示了 DuckDB 用于从存储中溢出和加载数据的一些技巧。这些技巧已在 DuckDB 的外部哈希聚合中实现,自 0.9.0 版本发布以来。我们在 H2O.ai 基准测试中对哈希聚合进行了测试,DuckDB 可以在只有 16 GB 内存的笔记本电脑上完成所有 50 GB 的查询。

有兴趣阅读更多内容吗?阅读我们关于外部聚合的论文