使用 DuckDB 进行快速 Top N 聚合和过滤
TL;DR: 使用 min
、max
、min_by
和 max_by
聚合函数中的 N
参数,可以更快速、更轻松地找到 Top N 值或过滤到最新的 N 行。
Top N 简介
分析数据时,一种常见模式是查找特定指标中最高或最低的数据行。当对整个数据集中最高或最低的 N
行感兴趣时,SQL 的标准 ORDER BY
和 LIMIT
子句将按感兴趣的指标排序并只返回 N
行。例如,使用 TPC-H 基准测试的比例因子 1 (SF1) 数据集:
INSTALL tpch;
LOAD tpch;
-- Generate an example TPC-H dataset
CALL dbgen(sf = 1);
-- Return the most recent 3 rows by l_shipdate
FROM lineitem
ORDER BY
l_shipdate DESC
LIMIT 3;
l_orderkey | l_partkey | … | l_shipmode | l_comment |
---|---|---|---|---|
354528 | 6116 | … | 邮件 | 按照惯例醒来 |
413956 | 16402 | … | 运输 | 通常的模式。小心 |
484581 | 10970 | … | 卡车 | 账目维护。顽固的账目 |
这对于快速获取数据集中最旧或最新的值,或查找特定指标中的异常值非常有用。
另一种常见方法是查询一个或多个列的 min/max 汇总统计信息。这可以找到异常值,但包含异常值的行可能因每列而异,因此它回答的是一个不同的问题。DuckDB 有用的 COLUMNS
表达式允许我们计算所有列的最大值。
FROM lineitem
SELECT
max(COLUMNS(*));
本文中的查询大量使用了 DuckDB 的
FROM
优先语法。这允许FROM
和SELECT
子句互换,甚至允许完全省略后者。
l_orderkey | l_partkey | … | l_shipmode | l_comment |
---|---|---|---|---|
600000 | 20000 | … | 卡车 | 谜题。狡猾地 |
然而,这两种方法只能回答某些类型的问题。在许多场景中,目标是了解按组分组的 Top N 值。在上面的第一个示例中,我们如何计算每个供应商的最后 10 次发货?SQL 的 LIMIT
子句无法处理这种情况。我们将这种类型的分析称为按组分组的 Top N。
这种类型的分析是探索新数据的常用工具。用例包括为每个组提取最近的几行,或查找组中最极端的几个值。回到我们的发货示例,我们可以查看每个零件号的最后 10 次发货,或者找出每个客户的 5 个最高价格订单。
传统的按组分组的 Top N
在大多数数据库中,按组分组过滤到 Top N 的方法是使用窗口函数和公用表表达式 (CTE)。这种方法在 DuckDB 中也适用。例如,此查询返回每个供应商最近的 3 次发货
WITH ranked_lineitem AS (
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
)
FROM ranked_lineitem
WHERE
my_ranking <= 3;
l_orderkey | l_partkey | l_suppkey | … | l_shipmode | l_comment | 我的排名 |
---|---|---|---|---|---|---|
1310688 | 169532 | 7081 | … | 铁路 | 最终的除外 | 1 |
910561 | 194561 | 7081 | … | 运输 | 大胆的借口 | 2 |
4406883 | 179529 | 7081 | … | 铁路 | 论。狂怒的 | 3 |
4792742 | 52095 | 7106 | … | 铁路 | 讽刺,讽刺的法庭。最终存款睡眠 | 1 |
4010212 | 122081 | 7106 | … | 邮件 | 账目终于讽刺性地劝说 | 2 |
1220871 | 94596 | 7106 | … | 卡车 | 以上常规请求 | 3 |
… | … | … | … | … | … | … |
在 DuckDB 中,这可以使用QUALIFY
子句进行简化。QUALIFY
的作用类似于 WHERE
子句,但它专门对窗口函数的结果进行操作。通过此调整,可以避免 CTE 同时返回相同的结果。
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3;
这无疑是一种可行的方法!但是,它的缺点是什么?即使查询只对最近的 3 次发货感兴趣,它也必须对所有发货进行排序才能检索到这 Top 3。由于 DuckDB 创新的基数排序实现,DuckDB 中的排序复杂度为 O(kn)
,但这仍然高于例如DuckDB 的哈希聚合的 O(n)
。与聚合相比,排序也是一项内存密集型操作。
DuckDB 中的 Top N
DuckDB 1.1 添加了一项新功能,可大幅简化和提高 Top N 计算的性能。具体而言,函数 min
、max
、min_by
和 max_by
现在都接受一个可选参数 N
。如果 N
大于 1(默认值),它们将返回一个包含 Top 值的数组。
举个简单的例子,我们查询最近的(前 3 个)发货日期
FROM lineitem
SELECT
max(l_shipdate, 3) AS top_3_shipdates;
前3个发货日期 |
---|
[1998-12-01, 1998-12-01, 1998-12-01] |
DuckDB 中按列的 Top N
借助 COLUMNS
表达式,Top N 选择变得更加有用——我们可以检索每列中 Top 3 的值。我们可以将其称为按列的 Top N 分析。尝试使用普通 SQL 进行这种分析尤其麻烦!您需要为每个单独的列使用子查询或窗口函数……在 DuckDB 中,只需
FROM lineitem
SELECT
max(COLUMNS(*), 3) AS "top_3_\0";
前3个订单键 | 前3个零件键 | … | 前3个发货模式 | 前3个评论 |
---|---|---|---|---|
[600000, 600000, 599975] | [20000, 20000, 20000] | … | [卡车, 卡车, 卡车] | [谜题。狡猾地,谜题。快速大胆地,谜题。斑豆巧妙地巧妙地完成了] |
DuckDB 中按组分组的 Top N
有了新的 N
参数,我们如何加速按组分组的 Top N 分析?
想直接查看最终输出吗?请随意跳过!
我们将利用 DuckDB 的三个其他 SQL 功能来实现这一点
max
函数将返回特定列的最大值(或现在的 Top N!)。相比之下,max_by
函数将找到列中的最大值,然后从同一行但不同列中检索一个值。例如,此查询将返回每个供应商最近 3 个已发货订单的 ID
FROM lineitem
SELECT
l_suppkey,
max_by(l_orderkey, l_shipdate, 3) AS recent_orders
GROUP BY
l_suppkey;
l_suppkey | 最近的订单 |
---|---|
2992 | [233573, 3597639, 3060227] |
8516 | [4675968, 5431174, 4626530] |
3205 | [3844610, 4396966, 3405255] |
2152 | [1672000, 4209601, 3831138] |
1880 | [4852999, 2863747, 1650084] |
… | … |
max_by
函数是一个聚合函数,因此它利用 DuckDB 快速的哈希聚合而不是排序。它不是按 l_shipdate
排序,而是 max_by
函数只扫描数据集一次,并跟踪 N
个最高的 l_shipdate
值。然后它返回与每个最近发货日期对应的订单 ID。DuckDB 中的基数排序必须每字节扫描数据集一次,因此只扫描一次可以显著提高速度。例如,如果按 64 位整数排序,排序算法必须循环数据集 8 次,而此方法只需 1 次!性能比较部分包含一个简单的微基准测试。
然而,这个 SQL 查询有一些缺陷。查询以 LIST
而不是独立行的形式返回结果。幸运的是,unnest
函数可以将 LIST
分割成独立行。
FROM lineitem
SELECT
l_suppkey,
unnest(
max_by(l_orderkey, l_shipdate, 3)
) AS recent_orders
GROUP BY
l_suppkey;
l_suppkey | 最近的订单 |
---|---|
2576 | 930468 |
2576 | 2248354 |
2576 | 3640711 |
5559 | 4022148 |
5559 | 1675680 |
5559 | 4976259 |
… | … |
下一个缺陷是无法轻易看到与返回的 l_orderkey
值关联的 l_shipdate
。此查询只返回一列,而通常按组分组的 Top N 分析将需要整行。
幸运的是,DuckDB 允许我们将行的全部内容视为单个列!通过引用表本身的名称(此处为 lineitem
)而不是列的名称,max_by
函数可以检索所有列。
FROM lineitem
SELECT
l_suppkey,
unnest(
max_by(lineitem, l_shipdate, 3)
) AS recent_orders
GROUP BY
l_suppkey;
l_suppkey | 最近的订单 |
---|---|
5411 | {'l_orderkey': 2543618, 'l_partkey': 105410, 'l_suppkey': 5411, … |
5411 | {'l_orderkey': 580547, 'l_partkey': 130384, 'l_suppkey': 5411, … |
5411 | {'l_orderkey': 3908642, 'l_partkey': 132897, 'l_suppkey': 5411, … |
90 | {'l_orderkey': 4529697, 'l_partkey': 122553, 'l_suppkey': 90, … |
90 | {'l_orderkey': 4473346, 'l_partkey': 160089, 'l_suppkey': 90, … |
… | … |
让我们通过将 STRUCT
分割成单独的列,使其看起来更友好,以匹配我们原始的数据集。
最终的按组分组的 Top N 查询
再向 UNNEST
传递一个参数将通过递归运行将其分割成单独的列。在这种情况下,这意味着 UNNEST
将运行两次:一次将每个 LIST
转换为单独的行,然后再次将每个 STRUCT
转换为单独的列。l_suppkey
列也可以排除,因为它将自动包含。
FROM lineitem
SELECT
unnest(
max_by(lineitem, l_shipdate, 3),
recursive := 1
) AS recent_orders
GROUP BY
l_suppkey;
l_orderkey | l_partkey | l_suppkey | … | l_shipinstruct | l_shipmode | l_comment |
---|---|---|---|---|---|---|
1234726 | 6875 | 6876 | … | COD代收 | FOB | 巧妙地仔细地完成 |
2584193 | 51865 | 6876 | … | 取回退回 | 卡车 | 定期地在q处充分存款 |
2375524 | 26875 | 6876 | … | 亲自交付 | 空气 | 不寻常的想法。忙碌的大胆存款 |
5751559 | 95626 | 8136 | … | 无 | 运输 | ers 蓬松地唠叨着 |
3103457 | 103115 | 8136 | … | 取回退回 | FOB | y 狡猾地表达疣猪——不寻常,e |
5759105 | 178135 | 8136 | … | COD代收 | 卡车 | es. 定期斑豆讨价还价。 |
… | … | … | … | … | … | … |
这种方法对于常见的通过查找组内最新值进行去重任务也很有用。一种模式是通过返回事件表中最新事件来查找数据集的当前状态。只需使用
N
为 1 即可!
我们现在有一种使用聚合函数计算每组 Top N 行的方法!那么,它的效率提高了多少呢?
性能比较
我们将比较 QUALIFY
方法和 max_by
方法解决按组分组的 Top N 问题。我们已经讨论了这两个查询,但为方便参考,它们在下面重复。
QUALIFY
查询
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3;
max_by
查询
FROM lineitem
SELECT
unnest(
max_by(lineitem, l_shipdate, 3),
recursive := 1
)
GROUP BY
l_suppkey;
当主查询运行时,我们还将启动一个后台线程,定期测量 DuckDB 的内存使用情况。这使用了内置的表函数 duckdb_memory()
,并包含了内存使用和临时磁盘使用信息。用于基准测试的简单 Python 脚本包含在结果下方。用于基准测试的机器是配备 16 GB RAM 的 M1 MacBook Pro。
SF | 最大内存 |
度量 | 符合条件 |
最大值 |
改进 |
---|---|---|---|---|---|
1 | 默认值 | 总时间 | 0.58 秒 | 0.24 秒 | 2.4倍 |
5 | 默认值 | 总时间 | 6.15 秒 | 1.26 秒 | 4.9倍 |
10 | 36 GB | 总时间 | 36.8 秒 | 25.4 秒 | 1.4倍 |
1 | 默认值 | 内存使用 | 1.7 GB | 0.2 GB | 8.5倍 |
5 | 默认值 | 内存使用 | 7.9 GB | 1.5 GB | 5.3倍 |
10 | 36 GB | 内存使用 | 15.7 GB | 17.1 GB | 0.9倍 |
我们可以看到,在这些情况下,max_by
方法都更快,在某些情况下几乎快了 5 倍!然而,随着数据量的增大,max_by
方法相对于 QUALIFY
开始变弱。
在某些情况下,使用 max_by
内存使用量也显著降低。然而,随着规模的增加,max_by
方法的内存使用量变得更加显著,因为不同 l_suppkey
值的数量随比例因子线性增加。这种增加的内存使用量可能解释了性能下降,因为两种算法都接近了我的机器上的最大 RAM 量并开始交换到磁盘。
为了减少内存压力,让我们使用更少的线程(4 个线程和 1 个线程)重新运行比例因子 10 (SF10) 基准测试。我们继续使用 36 GB 的 max_memory
设置。为了参考,包含了之前使用所有 10 个线程的 SF10 结果。
SF | 线程 | 度量 | 符合条件 |
最大值 |
改进 |
---|---|---|---|---|---|
10 | 10 | 总时间 | 36.8 秒 | 25.4 秒 | 1.4倍 |
10 | 4 | 总时间 | 49.0 秒 | 21.0 秒 | 2.3倍 |
10 | 1 | 总时间 | 115.7 秒 | 12.7 秒 | 9.1倍 |
10 | 10 | 内存使用 | 15.7 GB | 17.1 GB | 0.9倍 |
10 | 4 | 内存使用 | 15.9 GB | 17.3 GB | 0.9倍 |
10 | 1 | 内存使用 | 14.5 GB | 1.8 GB | 8.1倍 |
max_by
方法的计算效率如此之高,即使只有一个线程,它也比使用所有 10 个线程的 QUALIFY
方法快得多!减少线程数也有效地降低了内存使用(几乎降低了 10 倍)。
那么,我们应该何时使用每种方法呢?与所有数据库事物一样,这取决于情况! 如果内存受限,max_by
也可能带来好处,特别是当调整线程数以避免溢出到磁盘时。但是,如果组的数量大约与行数相同,请考虑使用 QUALIFY
,因为我们会失去 max_by
方法的一些内存效率。
Python 基准测试脚本
import duckdb
import pandas as pd
from threading import Thread
from time import sleep
from datetime import datetime
from os import remove
def check_memory(stop_function, filepath, sleep_seconds, results_dict):
print("Starting background thread")
background_con = duckdb.connect(filepath)
max_memory = 0
max_temporary_storage = 0
while True:
if stop_function():
break
# Profile the memory
memory_profile = background_con.sql("""
FROM duckdb_memory()
SELECT
tag,
round(memory_usage_bytes / (1000000), 0)::bigint AS memory_usage_mb,
round(temporary_storage_bytes / (1000000), 0)::bigint AS temporary_storage_mb;
""").df()
print(memory_profile)
total_memory = background_con.sql("""
FROM memory_profile
select
sum(memory_usage_mb) AS total_memory_usage_mb,
sum(temporary_storage_mb) AS total_temporary_storage_mb
""").fetchall()
print('Current memory:', total_memory[0][0])
print('Current temporary_storage:', total_memory[0][1])
if total_memory[0][0] > max_memory:
max_memory = total_memory[0][0]
if total_memory[0][1] > max_temporary_storage:
max_temporary_storage = total_memory[0][1]
print('Maximum memory:', max_memory)
print('Maximum temporary_storage:', max_temporary_storage)
sleep(sleep_seconds)
results_dict["max_memory"] = max_memory
results_dict["max_temporary_storage"] = max_temporary_storage
background_con.close()
return
def query_and_profile(filepath, sql):
con = duckdb.connect(filepath)
con.sql("set max_memory='36GB'")
results_dict = {}
stop_threads = False
background_memory_thread = Thread(target=check_memory,
args=(lambda : stop_threads, filepath, 0.1, results_dict, ))
background_memory_thread.start()
print("Starting query:")
start_time = datetime.now()
results_df = con.sql(sql).df()
results_dict["total_time_seconds"] = (datetime.now() - start_time).total_seconds()
print(results_df.head(10))
stop_threads = True
background_memory_thread.join()
con.close()
return results_dict
filepath = './arg_max_check_duckdb_memory_v3.duckdb'
con = duckdb.connect(filepath)
print("Begin initial tpch load")
con.sql("""call dbgen(sf=1);""")
con.close()
sql = """
FROM lineitem
SELECT
UNNEST(
max_by(lineitem, l_shipdate, 3),
recursive := 1
)
GROUP BY
l_suppkey
;"""
max_by_results = query_and_profile(filepath, sql)
sql = """
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3
;"""
qualify_results = query_and_profile(filepath, sql)
print('max_by_results:', max_by_results)
print('qualify_results:', qualify_results)
remove(filepath)
结论
DuckDB 现在提供了一种便捷的方式来计算 min
和 max
聚合函数的 Top N 值,以及它们的进阶版本 min_by
和 max_by
。它们易于上手,并且能够实现更复杂的分析,例如计算所有列的 Top N 或按组分组的 Top N。与窗口函数方法相比,还可能带来性能优势。
我们很乐意听到您如何创造性地使用这项新功能!
祝您分析愉快!