使用 DuckDB 进行快速 Top N 聚合和过滤

Author Avatar
Alex Monahan
2024-10-25 · 13 分钟

TL;DR: 使用 minmaxmin_bymax_by 聚合函数中的 N 参数,可以更快速、更轻松地找到 Top N 值或过滤到最新的 N 行。

Top N 简介

分析数据时,一种常见模式是查找特定指标中最高或最低的数据行。当对整个数据集中最高或最低的 N 行感兴趣时,SQL 的标准 ORDER BYLIMIT 子句将按感兴趣的指标排序并只返回 N 行。例如,使用 TPC-H 基准测试的比例因子 1 (SF1) 数据集:

INSTALL tpch;
LOAD tpch;
-- Generate an example TPC-H dataset
CALL dbgen(sf = 1);

-- Return the most recent 3 rows by l_shipdate
FROM lineitem
ORDER BY
    l_shipdate DESC
LIMIT 3;
l_orderkey l_partkey l_shipmode l_comment
354528 6116 邮件 按照惯例醒来
413956 16402 运输 通常的模式。小心
484581 10970 卡车 账目维护。顽固的账目

这对于快速获取数据集中最旧或最新的值,或查找特定指标中的异常值非常有用。

另一种常见方法是查询一个或多个列的 min/max 汇总统计信息。这可以找到异常值,但包含异常值的行可能因每列而异,因此它回答的是一个不同的问题。DuckDB 有用的 COLUMNS 表达式允许我们计算所有列的最大值。

FROM lineitem
SELECT
    max(COLUMNS(*));

本文中的查询大量使用了 DuckDB 的 FROM 优先语法。这允许 FROMSELECT 子句互换,甚至允许完全省略后者。

l_orderkey l_partkey l_shipmode l_comment
600000 20000 卡车 谜题。狡猾地

然而,这两种方法只能回答某些类型的问题。在许多场景中,目标是了解按组分组的 Top N 值。在上面的第一个示例中,我们如何计算每个供应商的最后 10 次发货?SQL 的 LIMIT 子句无法处理这种情况。我们将这种类型的分析称为按组分组的 Top N。

这种类型的分析是探索新数据的常用工具。用例包括为每个组提取最近的几行,或查找组中最极端的几个值。回到我们的发货示例,我们可以查看每个零件号的最后 10 次发货,或者找出每个客户的 5 个最高价格订单。

传统的按组分组的 Top N

在大多数数据库中,按组分组过滤到 Top N 的方法是使用窗口函数公用表表达式 (CTE)。这种方法在 DuckDB 中也适用。例如,此查询返回每个供应商最近的 3 次发货

WITH ranked_lineitem AS (
    FROM lineitem
    SELECT
        *,
        row_number() OVER
            (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
            AS my_ranking
)
FROM ranked_lineitem
WHERE
    my_ranking <= 3;
l_orderkey l_partkey l_suppkey l_shipmode l_comment 我的排名
1310688 169532 7081 铁路 最终的除外 1
910561 194561 7081 运输 大胆的借口 2
4406883 179529 7081 铁路 论。狂怒的 3
4792742 52095 7106 铁路 讽刺,讽刺的法庭。最终存款睡眠 1
4010212 122081 7106 邮件 账目终于讽刺性地劝说 2
1220871 94596 7106 卡车 以上常规请求 3

在 DuckDB 中,这可以使用QUALIFY 子句进行简化。QUALIFY 的作用类似于 WHERE 子句,但它专门对窗口函数的结果进行操作。通过此调整,可以避免 CTE 同时返回相同的结果。

FROM lineitem
SELECT
    *,
    row_number() OVER
        (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
        AS my_ranking
QUALIFY
    my_ranking <= 3;

这无疑是一种可行的方法!但是,它的缺点是什么?即使查询只对最近的 3 次发货感兴趣,它也必须对所有发货进行排序才能检索到这 Top 3。由于 DuckDB 创新的基数排序实现,DuckDB 中的排序复杂度为 O(kn),但这仍然高于例如DuckDB 的哈希聚合O(n)。与聚合相比,排序也是一项内存密集型操作。

DuckDB 中的 Top N

DuckDB 1.1 添加了一项新功能,可大幅简化和提高 Top N 计算的性能。具体而言,函数 minmaxmin_bymax_by 现在都接受一个可选参数 N。如果 N 大于 1(默认值),它们将返回一个包含 Top 值的数组。

举个简单的例子,我们查询最近的(前 3 个)发货日期

FROM lineitem
SELECT
    max(l_shipdate, 3) AS top_3_shipdates;
前3个发货日期
[1998-12-01, 1998-12-01, 1998-12-01]

DuckDB 中按列的 Top N

借助 COLUMNS 表达式,Top N 选择变得更加有用——我们可以检索每列中 Top 3 的值。我们可以将其称为按列的 Top N 分析。尝试使用普通 SQL 进行这种分析尤其麻烦!您需要为每个单独的列使用子查询或窗口函数……在 DuckDB 中,只需

FROM lineitem
SELECT
    max(COLUMNS(*), 3) AS "top_3_\0";
前3个订单键 前3个零件键 前3个发货模式 前3个评论
[600000, 600000, 599975] [20000, 20000, 20000] [卡车, 卡车, 卡车] [谜题。狡猾地,谜题。快速大胆地,谜题。斑豆巧妙地巧妙地完成了]

DuckDB 中按组分组的 Top N

有了新的 N 参数,我们如何加速按组分组的 Top N 分析?

想直接查看最终输出吗?请随意跳过!

我们将利用 DuckDB 的三个其他 SQL 功能来实现这一点

max 函数将返回特定列的最大值(或现在的 Top N!)。相比之下,max_by 函数将找到列中的最大值,然后从同一行但不同列中检索一个值。例如,此查询将返回每个供应商最近 3 个已发货订单的 ID

FROM lineitem
SELECT
    l_suppkey,
    max_by(l_orderkey, l_shipdate, 3) AS recent_orders
GROUP BY
    l_suppkey;
l_suppkey 最近的订单
2992 [233573, 3597639, 3060227]
8516 [4675968, 5431174, 4626530]
3205 [3844610, 4396966, 3405255]
2152 [1672000, 4209601, 3831138]
1880 [4852999, 2863747, 1650084]

max_by 函数是一个聚合函数,因此它利用 DuckDB 快速的哈希聚合而不是排序。它不是按 l_shipdate 排序,而是 max_by 函数只扫描数据集一次,并跟踪 N 个最高的 l_shipdate 值。然后它返回与每个最近发货日期对应的订单 ID。DuckDB 中的基数排序必须每字节扫描数据集一次,因此只扫描一次可以显著提高速度。例如,如果按 64 位整数排序,排序算法必须循环数据集 8 次,而此方法只需 1 次!性能比较部分包含一个简单的微基准测试。

然而,这个 SQL 查询有一些缺陷。查询以 LIST 而不是独立行的形式返回结果。幸运的是,unnest 函数可以将 LIST 分割成独立行。

FROM lineitem
SELECT
    l_suppkey,
    unnest(
        max_by(l_orderkey, l_shipdate, 3)
    ) AS recent_orders
GROUP BY
    l_suppkey;
l_suppkey 最近的订单
2576 930468
2576 2248354
2576 3640711
5559 4022148
5559 1675680
5559 4976259

下一个缺陷是无法轻易看到与返回的 l_orderkey 值关联的 l_shipdate。此查询只返回一列,而通常按组分组的 Top N 分析将需要整行。

幸运的是,DuckDB 允许我们将行的全部内容视为单个列!通过引用表本身的名称(此处为 lineitem)而不是列的名称,max_by 函数可以检索所有列。

FROM lineitem
SELECT
    l_suppkey,
    unnest(
        max_by(lineitem, l_shipdate, 3)
    ) AS recent_orders
GROUP BY
    l_suppkey;
l_suppkey 最近的订单
5411 {'l_orderkey': 2543618, 'l_partkey': 105410, 'l_suppkey': 5411, …
5411 {'l_orderkey': 580547, 'l_partkey': 130384, 'l_suppkey': 5411, …
5411 {'l_orderkey': 3908642, 'l_partkey': 132897, 'l_suppkey': 5411, …
90 {'l_orderkey': 4529697, 'l_partkey': 122553, 'l_suppkey': 90, …
90 {'l_orderkey': 4473346, 'l_partkey': 160089, 'l_suppkey': 90, …

让我们通过将 STRUCT 分割成单独的列,使其看起来更友好,以匹配我们原始的数据集。

最终的按组分组的 Top N 查询

再向 UNNEST 传递一个参数将通过递归运行将其分割成单独的列。在这种情况下,这意味着 UNNEST 将运行两次:一次将每个 LIST 转换为单独的行,然后再次将每个 STRUCT 转换为单独的列。l_suppkey 列也可以排除,因为它将自动包含。

FROM lineitem
SELECT
    unnest(
        max_by(lineitem, l_shipdate, 3),
        recursive := 1
    ) AS recent_orders
GROUP BY
    l_suppkey;
l_orderkey l_partkey l_suppkey l_shipinstruct l_shipmode l_comment
1234726 6875 6876 COD代收 FOB 巧妙地仔细地完成
2584193 51865 6876 取回退回 卡车 定期地在q处充分存款
2375524 26875 6876 亲自交付 空气 不寻常的想法。忙碌的大胆存款
5751559 95626 8136 运输 ers 蓬松地唠叨着
3103457 103115 8136 取回退回 FOB y 狡猾地表达疣猪——不寻常,e
5759105 178135 8136 COD代收 卡车 es. 定期斑豆讨价还价。

这种方法对于常见的通过查找组内最新值进行去重任务也很有用。一种模式是通过返回事件表中最新事件来查找数据集的当前状态。只需使用 N 为 1 即可!

我们现在有一种使用聚合函数计算每组 Top N 行的方法!那么,它的效率提高了多少呢?

性能比较

我们将比较 QUALIFY 方法和 max_by 方法解决按组分组的 Top N 问题。我们已经讨论了这两个查询,但为方便参考,它们在下面重复。

QUALIFY 查询
FROM lineitem
SELECT
    *,
    row_number() OVER
        (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
        AS my_ranking
QUALIFY
    my_ranking <= 3;
max_by 查询
FROM lineitem
SELECT
    unnest(
        max_by(lineitem, l_shipdate, 3),
        recursive := 1
    )
GROUP BY
    l_suppkey;

当主查询运行时,我们还将启动一个后台线程,定期测量 DuckDB 的内存使用情况。这使用了内置的表函数 duckdb_memory(),并包含了内存使用和临时磁盘使用信息。用于基准测试的简单 Python 脚本包含在结果下方。用于基准测试的机器是配备 16 GB RAM 的 M1 MacBook Pro。

SF 最大内存 度量 符合条件 最大值 改进
1 默认值 总时间 0.58 秒 0.24 秒 2.4倍
5 默认值 总时间 6.15 秒 1.26 秒 4.9倍
10 36 GB 总时间 36.8 秒 25.4 秒 1.4倍
1 默认值 内存使用 1.7 GB 0.2 GB 8.5倍
5 默认值 内存使用 7.9 GB 1.5 GB 5.3倍
10 36 GB 内存使用 15.7 GB 17.1 GB 0.9倍

我们可以看到,在这些情况下,max_by 方法都更快,在某些情况下几乎快了 5 倍!然而,随着数据量的增大,max_by 方法相对于 QUALIFY 开始变弱。

在某些情况下,使用 max_by 内存使用量也显著降低。然而,随着规模的增加,max_by 方法的内存使用量变得更加显著,因为不同 l_suppkey 值的数量随比例因子线性增加。这种增加的内存使用量可能解释了性能下降,因为两种算法都接近了我的机器上的最大 RAM 量并开始交换到磁盘。

为了减少内存压力,让我们使用更少的线程(4 个线程和 1 个线程)重新运行比例因子 10 (SF10) 基准测试。我们继续使用 36 GB 的 max_memory 设置。为了参考,包含了之前使用所有 10 个线程的 SF10 结果。

SF 线程 度量 符合条件 最大值 改进
10 10 总时间 36.8 秒 25.4 秒 1.4倍
10 4 总时间 49.0 秒 21.0 秒 2.3倍
10 1 总时间 115.7 秒 12.7 秒 9.1倍
10 10 内存使用 15.7 GB 17.1 GB 0.9倍
10 4 内存使用 15.9 GB 17.3 GB 0.9倍
10 1 内存使用 14.5 GB 1.8 GB 8.1倍

max_by 方法的计算效率如此之高,即使只有一个线程,它也比使用所有 10 个线程的 QUALIFY 方法快得多!减少线程数也有效地降低了内存使用(几乎降低了 10 倍)。

那么,我们应该何时使用每种方法呢?与所有数据库事物一样,这取决于情况! 如果内存受限,max_by 也可能带来好处,特别是当调整线程数以避免溢出到磁盘时。但是,如果组的数量大约与行数相同,请考虑使用 QUALIFY,因为我们会失去 max_by 方法的一些内存效率。

Python 基准测试脚本
import duckdb
import pandas as pd
from threading import Thread
from time import sleep
from datetime import datetime
from os import remove

def check_memory(stop_function, filepath, sleep_seconds, results_dict):
    print("Starting background thread")
    background_con = duckdb.connect(filepath)
    max_memory = 0
    max_temporary_storage = 0
    while True:
        if stop_function():
            break
        # Profile the memory
        memory_profile = background_con.sql("""
            FROM duckdb_memory()
            SELECT
                tag,
                round(memory_usage_bytes / (1000000), 0)::bigint AS memory_usage_mb,
                round(temporary_storage_bytes / (1000000), 0)::bigint AS temporary_storage_mb;
            """).df()
        print(memory_profile)
        total_memory = background_con.sql("""
            FROM memory_profile
            select
                sum(memory_usage_mb) AS total_memory_usage_mb,
                sum(temporary_storage_mb) AS total_temporary_storage_mb
            """).fetchall()
        print('Current memory:', total_memory[0][0])
        print('Current temporary_storage:', total_memory[0][1])

        if total_memory[0][0] > max_memory:
            max_memory = total_memory[0][0]
        if total_memory[0][1] > max_temporary_storage:
            max_temporary_storage = total_memory[0][1]

        print('Maximum memory:', max_memory)
        print('Maximum temporary_storage:', max_temporary_storage)

        sleep(sleep_seconds)

    results_dict["max_memory"] = max_memory
    results_dict["max_temporary_storage"] = max_temporary_storage
    background_con.close()

    return

def query_and_profile(filepath, sql):
    con = duckdb.connect(filepath)
    con.sql("set max_memory='36GB'")

    results_dict = {}
    stop_threads = False
    background_memory_thread = Thread(target=check_memory,
                                      args=(lambda : stop_threads, filepath, 0.1, results_dict, ))
    background_memory_thread.start()

    print("Starting query:")
    start_time = datetime.now()
    results_df = con.sql(sql).df()
    results_dict["total_time_seconds"] = (datetime.now() - start_time).total_seconds()
    print(results_df.head(10))

    stop_threads = True
    background_memory_thread.join()
    con.close()

    return results_dict

filepath = './arg_max_check_duckdb_memory_v3.duckdb'

con = duckdb.connect(filepath)
print("Begin initial tpch load")
con.sql("""call dbgen(sf=1);""")
con.close()

sql = """
    FROM lineitem
    SELECT
        UNNEST(
            max_by(lineitem, l_shipdate, 3),
            recursive := 1
        )
    GROUP BY
        l_suppkey
;"""

max_by_results = query_and_profile(filepath, sql)

sql = """
    FROM lineitem
    SELECT
        *,
        row_number() OVER
            (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
            AS my_ranking
    QUALIFY
        my_ranking <= 3
;"""

qualify_results = query_and_profile(filepath, sql)

print('max_by_results:', max_by_results)
print('qualify_results:', qualify_results)

remove(filepath)

结论

DuckDB 现在提供了一种便捷的方式来计算 minmax 聚合函数的 Top N 值,以及它们的进阶版本 min_bymax_by。它们易于上手,并且能够实现更复杂的分析,例如计算所有列的 Top N 或按组分组的 Top N。与窗口函数方法相比,还可能带来性能优势。

我们很乐意听到您如何创造性地使用这项新功能!

祝您分析愉快!