在DuckDB中最大化你的Delta扫描性能
TL;DR: 我们发布了 delta
扩展的新版本,其中包含多项新功能和性能改进。在这篇博客文章中,我们将通过一些基准测试来全面考察 delta
扩展,并深入探讨一些新的性能相关功能。
概览
在我们之前的文章中,我们讨论了Delta Lake表格式的全部内容,以及DuckDB的delta
扩展如何利用Delta Kernel库提供原生支持。在这篇博客文章中,我们将重点介绍如何从DuckDB读取Delta表以获得最佳性能。我们将首先简要回顾Delta,然后展示过去几个版本中实现的性能提升。最后,我们将演示最新Delta版本中可用的三个关键功能,它们将确保您从Delta读取性能中获得最大收益:元数据缓存、文件跳过和分区信息下推。
Delta开放表格式
让我们先简要回顾一下Delta,以便快速了解。 Delta Lake是一种开放表格式,类似于Apache Iceberg和Apache Hudi。开放表格式最好理解为“数据和元数据文件的集合”,旨在提供数据湖的灵活性,同时提供传统数据仓库的一些一致性保证。对于Delta,该格式由用于数据存储的Parquet文件以及用于元数据存储的Parquet、JSON和二进制文件的混合组成。除了提供改进的一致性水平外,开放表格式提供的额外元数据还允许通过列统计信息和文件跳过等方式进行各种性能优化。有关更深入的解释,请参阅之前的Delta博客文章。
delta
扩展
DuckDB通过delta
扩展原生支持读取Delta表。此扩展是DuckDB核心扩展之一,每周下载量超过7万次。使用此扩展从Delta表读取非常简单。从DuckDB v1.2.0开始,delta
扩展将在首次使用时自动安装,并在调用delta_scan
函数时加载。
例如,要读取本地Delta表,只需打开任何DuckDB客户端并运行
SELECT * FROM delta_scan('./path_to_your_delta_table');
你的Delta表在别人的机器上,或许在AWS中?DuckDB也可以直接从S3查询!要让DuckDB自动加载你的AWS凭证并查询远程Delta表,请运行
CREATE SECRET (TYPE s3, PROVIDER credential_chain);
SELECT * FROM delta_scan('s3://your-bucket/your_delta_table');
对于Azure或Google Cloud等其他云提供商,请查看该扩展的文档页面。
delta
v0.1.0 与 0.3.0 之间的性能改进
尽管delta
扩展的第一个版本(v0.1.0)已经包含了各种性能相关功能,如投影下推和常量过滤下推,但此后添加的功能极大地提升了delta_scan
的性能。为了说明这一点,我们的第一个基准测试将使用行业标准的TPC-DS基准测试,采用比例因子1数据集(SF1)。
基准测试设置
对于此基准测试,我们启动了一个AWS c6id.4xlarge实例(16个vCPU,32 GB内存),并使用PySpark将TPC-DS SF1数据集写入同一区域(eu-west-1)的S3存储桶。每个基准测试共运行6次,结果取最后5次运行的中位数运行时,第一次运行被视为冷启动。汇总结果显示在下表中。
结果 | 总运行时长 | 最短运行时长 | 最长运行时长 | 中位数运行时长 | 查询超时 |
---|---|---|---|---|---|
delta 扩展 v0.1.0 |
444.76 | 0.48 | 21.31 | 3.63 | 4 |
delta 扩展 v0.3.0 |
151.06 | 0.46 | 6.15 | 1.22 | 0 |
基准测试的详细结果显示在折叠部分
分析
从结果中我们可以看到,整体性能有了显著提升。在v0.1.0中,99个查询中有4个达到了30秒的基准测试超时并被排除在结果之外。在v0.3.0中,所有99个查询都在超时时间内完成。比较总运行时长(不包括v0.1.0中超时的查询),我们发现速度提升了3倍以上!
现在,无需深入太多细节,这里速度提升的一个重要部分可归因于在PR #77中添加的**基数信息传播**。准确的基数估计对于DuckDB的查询优化器良好运行并生成高效的查询计划至关重要。具体来说,DuckDB的连接优化器利用基数估计来改变连接的执行顺序。连接顺序可以极大地影响中间元组的基数,这对查询性能有很大影响。特别是在像TPC-DS基准测试这样包含大量连接的查询工作负载中,连接顺序优化器起着关键作用。有关更多详细信息,请查阅这篇论文。
进一步优化
附加Delta表
除了基数信息传播等通用性能改进之外,delta
扩展还添加了多项与性能相关的功能。其中之一就是附加Delta表的能力。使用ATTACH
查询Delta表具有多重优势。首先,通过使用ATTACH
,当多次查询同一张表时,你的查询可以看起来更简洁,因为你无需每次都重复完整的Delta表路径。更重要的是,使用ATTACH
将允许DuckDB缓存/重用Delta元数据的某些部分,这可以提高查询性能。要附加本地Delta表,请运行
ATTACH './path_to_your_delta_table' AS your_table (TYPE delta);
附加Delta表后,你可以直接使用别名查询该表
SELECT * FROM your_table;
默认情况下,DuckDB将在同一事务中自动缓存Delta元数据。这意味着,如果在该事务中多次扫描Delta表,DuckDB可以在不同扫描之间重用部分Delta元数据。例如,以下查询将只读取Delta元数据一次
SELECT * FROM t1
UNION ALL
SELECT * FROM t1;
为了进一步提高性能,DuckDB还支持**在不同查询之间持久化此缓存的Delta元数据**。为此,可以使用PIN_SNAPSHOT
选项附加Delta表。启用此选项后,后续查询可以重用元数据,例如在以下代码块中所示
ATTACH 's3://your-bucket/your_delta_table' AS t2 (
TYPE delta,
PIN_SNAPSHOT
);
-- First scan (metadata not yet cached)
SELECT * FROM t1;
-- Second scan (metadata is now cached)
SELECT * FROM t2;
元数据缓存可以显著影响性能,尤其是在数据相对较小且延迟较高的情况下。为了说明这一点,我们将重新运行TPC-DS实验,比较三种不同的Delta表扫描方式:使用delta_scan
、使用ATTACH
以及使用ATTACH ... (PIN_SNAPSHOT)
。其余基准测试设置与上一节中的相同。
结果 | 总运行时长 | 最短运行时长 | 最长运行时长 | 中位数运行时长 |
---|---|---|---|---|
delta_scan |
151.06 | 0.46 | 6.15 | 1.22 |
ATTACH |
134.26 | 0.43 | 4.28 | 1.19 |
ATTACH (PIN_SNAPSHOT ) |
102.80 | 0.36 | 4.04 | 0.87 |
基准测试的详细结果显示在折叠部分
结果显示,对于许多TPC-DS查询,使用ATTACH
而非delta_scan
已经可以略微提高某些查询的性能,总运行时长实现了1.13倍的加速。当元数据由于PIN_SNAPSHOT
完全缓存时,我们看到了更大的1.47倍加速。然而,这带来的权衡是会错过在ATTACH
语句之后对表发生的任何更新。
仔细查看完整结果的人还会发现,在少数情况下,ATTACH
的结果实际上比直接使用delta_scan
的结果略差。我们将在关于下推 / ATTACH
相互作用的章节中解释这一点。
文件跳过
扫描Delta表的另一个关键性能功能是文件跳过。正如引言中所述,Delta表包含的元数据包含了表数据文件的各种统计信息。DuckDB等引擎可以使用这些统计信息来决定哪些Parquet文件需要扫描,哪些可以完全跳过。文件跳过是DuckDB自动完成的。文件跳过对常量过滤器和动态过滤器(在查询执行期间计算的过滤器)都有效。
-- constant filter
FROM delta_scan('...')
WHERE col_a > 'some_value';
-- dynamic filter
FROM delta_scan('...')
WHERE col_a > (SELECT max(col_z) FROM other_tbl);
在之前的基准测试中,文件跳过效果非常有限。整体数据量不够大,而且许多查询无论如何都会触及大部分数据。然而,当查询中只触及相对较小部分数据时,文件跳过可以对性能产生巨大影响。为了演示这一点,我们首先生成一些测试数据。我们将使用与之前相同的基于PySpark的测试数据生成脚本。
该表包含1亿行,模式非常基础,包括一个递增的INTEGER
类型的id
列和一个VARCHAR
类型的value
列。如果我们使用DuckDB查询数据,我们将看到类似以下内容
FROM delta_scan('s3://your-bucket/your_delta_table');
┌──────────┬──────────────┐
│ id │ value │
│ int64 │ varchar │
├──────────┼──────────────┤
│ 49950000 │ val-49950000 │
│ 49950001 │ val-49950001 │
│ 49950002 │ val-49950002 │
│ 49950003 │ val-49950003 │
│ · │ · │
│ · │ · │
│ · │ · │
│ 49996 │ val-49996 │
│ 49997 │ val-49997 │
│ 49998 │ val-49998 │
│ 49999 │ val-49999 │
├──────────┴──────────────┤
│ 100000000 rows │
│ (8 shown) │
└─────────────────────────┘
现在,假设我们只对特定范围的id
感兴趣:也许我们只想要小于100的id
。我们现在将构建两个查询。
对于第一个查询,我们将使用glob模式直接读取表中存储的所有parquet文件
FROM parquet_scan('s3://your-bucket/your_delta_table/*.parquet')
WHERE id < 100;
我们这样做是为了说明文件跳过的好处,像这样直接扫描Delta表中的原始Parquet文件仅在此处有效,因为此表中没有更新、删除或检查点。
对于第二个查询,我们直接使用delta_scan
表函数扫描表,通过WHERE
子句仅选择我们感兴趣的id
。
FROM delta_scan('s3://your-bucket/your_delta_table')
WHERE id < 100;
现在,当从位于同一区域的S3存储桶上的c6id.4xlarge AWS实例运行这些查询时,我们可以看到它们的性能差异巨大。delta_scan
仅需≈0.5秒即可完成,而parquet_scan
则需要≈17秒。那么这里到底发生了什么?
我们可以使用DuckDB的EXPLAIN ANALYZE
语句来获取更多细节。我们首先分析parquet_scan
EXPLAIN ANALYZE
FROM parquet_scan('s3://your-bucket/your_delta_table/*.parquet')
WHERE id < 100;
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 17.08s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
...
┌─────────────┴─────────────┐
│ TABLE_SCAN │
│ ──────────────────── │
│ Function: │
│ PARQUET_SCAN │
│ │
│ Projections: │
│ id │
│ value │
│ │
│ Filters: id<100 │
│ │
│ 100 Rows │
│ (262.39s) │
└───────────────────────────┘
我们可以在EXPLAIN ANALYZE
输出中看到,我们的过滤器被正确下推,并且扫描正确地只产生了100行。这一切看起来都很好,对吧?那么,让我们将其与delta_scan
的EXPLAIN ANALYZE
输出进行比较
EXPLAIN ANALYZE
FROM delta_scan('s3://your-bucket/your_delta_table');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.615s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
...
┌─────────────┴─────────────┐
│ TABLE_SCAN │
│ ──────────────────── │
│ Projections: │
│ id │
│ value │
│ │
│ Filters: id<100 │
│ File Filters: id<100 │
│ │
│ Scanning Files: │
│ 1/2000 │
│ │
│ 100 Rows │
│ (0.06s) │
└───────────────────────────┘
对于delta_scan
函数的EXPLAIN ANALYZE
输出,我们可以看到两个新字段:File Filters
和Scanning Files
。这清楚地向我们展示了正在发生的事情。id<100
谓词现在用于两件事:它被下推到单个Parquet文件的扫描中,就像parquet_scan
一样,但它也作为一个文件过滤器出现,用于完全减少要扫描的文件列表!这使得要读取的Parquet元数据量**减少了2000倍**,从而带来了巨大的性能提升。
分区信息下推
最后一个DuckDB Delta性能特性是分区信息下推。分区信息下推和分区感知聚合运算符是DuckDB v1.2.0中引入的相对新的功能。在delta
扩展的v0.3.0版本中也添加了此功能,这意味着DuckDB现在可以使用分区信息来创建查询计划,从而利用已扫描数据已被分区的事实。为了展示分区信息的性能优势,我们,惊喜地,将运行另一个基准测试!这次,我们选择了比例因子为10的TPC-H数据集,并在一台32 GB的MacBook Pro M1 Max上运行实验。我们根据l_returnflag
和l_linestatus
列对lineitem
表进行了分区。然后我们运行Q1,它大致如下所示
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
...
FROM
lineitem
...
GROUP BY
l_returnflag,
l_linestatus
...;
请注意,该查询包含一个GROUP BY
语句,其中列出了我们的数据集已经分区的精确列。让DuckDB使用分区感知运算符是自动完成的,因此在这种情况下,只需运行
ATTACH './path_to_partitioned_directory/lineitem_sf10' AS lineitem (
TYPE delta
);
PRAGMA tpch(1);
将对分区Delta数据集执行TPC-H Q1。为了检查发生了什么,我们将再次使用EXPLAIN ANALYZE
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.477s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
...
┌─────────────┴─────────────┐
│ PARTITIONED_AGGREGATE │
│ ──────────────────── │
│ Groups: │
│ #0 │
│ #1 │
│ │
│ Aggregates: │
│ sum(#2) │
│ sum(#3) │
│ sum(#4) │
│ sum(#5) │
│ avg(#6) │
│ avg(#7) │
│ avg(#8) │
│ count_star() │
│ │
│ 4 Rows │
│ (0.65s) │
└─────────────┬─────────────┘
...
我们可以看到DuckDB已正确检测到分区信息,并正在使用PARTITIONED_AGGREGATE
运算符高效地执行GROUP BY
操作。
现在,作为基准,我们将重新运行相同的查询,但禁用分区信息下推
ATTACH './path_to_partitioned_directory/lineitem_sf10' AS lineitem (
TYPE delta,
PUSHDOWN_PARTITION_INFO 0
);
PRAGMA tpch(1);
同样,使用EXPLAIN ANALYZE
,我们可以看到DuckDB现在将使用常规的HASH_GROUP_BY
运算符,因为在查询规划期间Delta的分区信息不可用。
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.552s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
...
┌─────────────┴─────────────┐
│ HASH_GROUP_BY │
│ ──────────────────── │
│ Groups: │
│ #0 │
│ #1 │
│ │
│ Aggregates: │
│ sum(#2) │
│ sum(#3) │
│ sum(#4) │
│ sum(#5) │
│ avg(#6) │
│ avg(#7) │
│ avg(#8) │
│ count_star() │
│ │
│ 4 Rows │
│ (1.37s) │
└─────────────┬─────────────┘
...
现在,查看这两个查询之间的性能差异,我们可以看到总体加速仅为适度的1.16倍,但聚合操作本身加速了2.11倍!这意味着当查询经常进行大量分组操作时,按这些列对数据进行分区绝对是性能调优工具箱中一个非常有用的工具。
关于下推 / ATTACH
性能相互作用的注意事项
虽然诸如过滤器下推和分区信息下推等功能将提高许多工作负载的性能,但需要注意,使用ATTACH
的元数据缓存机制与过滤器和分区信息的下推之间存在某种复杂的相互作用。在关于ATTACH
功能的章节末尾,我们已经看到,对于某些查询,使用ATTACH
实际上比使用原始的delta_scan
略慢。无需深入太多细节,下推过滤器和分区信息可能会对某些查询的元数据缓存效率产生负面影响。这意味着,对于某些查询,你在使用ATTACH
时,可能会(有点反直觉地)从部分禁用过滤器下推中受益。
ATTACH './your_delta_table_directory' AS dt (
TYPE delta,
PIN_SNAPSHOT,
PUSHDOWN_PARTITION_INFO 0,
PUSHDOWN_FILTERS 'none'
);
但这应被视为高级用例,仅在针对特定查询进行优化时才相关。ATTACH
的默认设置应提供最佳整体性能,并建议在大多数情况下使用。此外,DuckDB Delta使用的底层delta-kernel-rs
库正在进行工作,旨在通过公开巧妙刷新DuckDB持有的元数据对象的机制来减少这种影响。一旦这些机制可用,我们将把它们添加到DuckDB delta
扩展中,届时这些标志很可能除了测试之外都将过时。
结论
在这篇博客文章中,我们考察了DuckDB delta
扩展的最新版本,并通过一些基准测试对其进行了检验。我们运行了行业标准TPC基准测试中的查询,以展示delta
扩展过去几个版本中取得的巨大性能改进。
此外,我们还研究了在使用Delta表时可以用来进一步提高性能的三种特定技术
- 使用
ATTACH
进行元数据缓存 - 利用过滤器和数据布局减少需要扫描的文件数量
- 利用分区信息加速聚合操作
总而言之,我们认为随着delta
扩展v0.3.0版本的发布,DuckDB能够以卓越的性能读取Delta表,适用于多种不同的工作负载,强烈鼓励大家尝试最新版本!