使用 DuckDB 在 Pandas 上高效运行 SQL

Mark Raasveldt 和 Hannes Mühleisen
2021-05-14 · 17 分钟

总结:DuckDB,一个免费开源的分析型数据管理系统,可以直接在 Pandas DataFrame 上高效运行 SQL 查询。

最近,一篇文章发表,倡导将 SQL 用于数据分析。在 DuckDB 团队,我们是 SQL 的忠实拥护者。它是一种多功能且灵活的语言,允许用户高效地执行各种数据转换,而无需关心数据的物理表示方式或如何以最优方式执行这些数据转换。

虽然您可以非常有效地在诸如 Postgres 等外部数据库系统中执行聚合和数据转换(如果您的数据存储在那里),但在某个时候,您需要将数据转换回 PandasNumPy 格式。这些库是 Python1 中庞大数据科学库生态系统(例如 scikit-learnTensorFlow)之间数据交换的标准。

1Apache Arrow 在该领域也获得了显著的关注,并且 DuckDB 也支持 Arrow。

如果您从文件(例如 CSV 或 Parquet 文件)读取数据,您的数据通常根本不会加载到外部数据库系统,而是直接加载到 Pandas DataFrame 中。

Pandas 上的 SQL

数据转换为 Pandas DataFrame 后,通常还需要执行额外的数据整理和分析。SQL 是执行这些类型数据转换的强大工具。使用 DuckDB,可以直接在 Pandas DataFrame 上高效运行 SQL。

作为一个简短的预告,这里有一个代码片段,可以帮助您做到这一点:使用 DuckDB 直接在 Pandas DataFrame 上运行任意 SQL 查询。

# to install: pip install duckdb
import pandas as pd
import duckdb

mydf = pd.DataFrame({'a' : [1, 2, 3]})
print(duckdb.query("SELECT sum(a) FROM mydf").to_df())

在本文的其余部分,我们将更深入地探讨其工作原理和速度表现。

数据集成与 Pandas 上的 SQL

DuckDB 的核心目标之一是,访问常见格式的数据应该容易。DuckDB 完全能够直接在 Pandas DataFrame(或 Parquet/CSV 文件,或 Arrow 表等)之上并行运行查询。不需要单独的(耗时的)导入步骤。

DuckDB 还可以将查询结果直接写入到这些格式中的任何一种。您可以使用 DuckDB 通过 SQL 并行处理 Pandas DataFrame,并将结果再次转换回 Pandas DataFrame,以便您可以在其他数据科学库中使用该结果。

当您在 SQL 中运行查询时,DuckDB 将查找名称与查询中表名匹配的 Python 变量,并自动开始读取您的 Pandas DataFrame。回顾之前的示例,我们可以看到它的实际运作方式

import pandas as pd
import duckdb

mydf = pd.DataFrame({'a' : [1, 2, 3]})
print(duckdb.query("SELECT sum(a) FROM mydf").to_df())

SQL 表名 mydf 被解释为本地 Python 变量 mydf,它恰好是一个 Pandas DataFrame,DuckDB 可以直接读取和查询它。列名和类型也从 DataFrame 中自动提取。

这个过程不仅无痛,而且效率极高。对于许多查询,您可以使用 DuckDB 比 Pandas 更快地处理数据,并且总内存使用量大大降低,而无需离开 Pandas DataFrame 二进制格式(“Pandas 输入,Pandas 输出”)。与使用 Postgres 等外部数据库系统不同,输入或输出的数据传输时间可以忽略不计(详见附录 A)。

Pandas 上的 SQL 性能

为了演示 DuckDB 在 Pandas DataFrame 上执行 SQL 时的性能,我们现在展示一些基准测试。基准测试的源代码可在 Google Colab 中进行交互使用。在这些基准测试中,我们纯粹在 Pandas DataFrame 上进行操作。DuckDB 代码和 Pandas 代码都完全基于“Pandas 输入,Pandas 输出”原则运行。

基准测试设置和数据集

我们完全在 Google Colab 环境中运行基准测试。对于我们的基准数据集,我们使用著名的 TPC-H 数据集。具体来说,我们关注 lineitemorders 表,因为它们是基准测试中最大的表。总数据集大小约为 1 GB(未压缩 CSV 格式,"比例因子" 1)。

由于 DuckDB 能够使用多处理器(多线程),我们包含了单线程版本和双线程版本。请注意,尽管 DuckDB 可以支持远超两个线程,但 Google Colab 仅支持两个线程。

设置

首先我们需要安装 DuckDB。这只需要一行简单的命令。

pip install duckdb

为了设置数据集进行处理,我们使用 wget 下载了两个 Parquet 文件。之后,我们使用 DuckDB 内置的 Parquet 读取器将数据加载到 Pandas DataFrame 中。系统通过查看文件的 .parquet 扩展名,自动推断我们正在读取 Parquet 文件。

lineitem = duckdb.query(
    "SELECT * FROM 'lineitemsf1.snappy.parquet'"
).to_df()
orders = duckdb.query(
    "SELECT * FROM 'orders.parquet'"
).to_df()

未分组聚合

对于我们的第一个查询,我们将在 Pandas DataFrame 上运行一组未分组的聚合操作。以下是 SQL 查询:

SELECT
    sum(l_extendedprice),
    min(l_extendedprice),
    max(l_extendedprice),
    avg(l_extendedprice)
FROM lineitem;

Pandas 代码看起来类似

lineitem.agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)
名称 时间 (秒)
DuckDB(1 线程) 0.079
DuckDB(2 线程) 0.048
Pandas 0.070

这项基准测试涉及一个非常简单的查询,Pandas 在此表现出色。这些简单的查询正是 Pandas 的优势所在(哈),因为它可以直接调用实现这些聚合的 NumPy 例程,这些例程效率极高。尽管如此,我们可以看到 DuckDB 在单线程情况下与 Pandas 性能相似,并且在启用多线程支持时从中受益。

分组聚合

对于我们的第二个查询,我们将运行相同的聚合集,但这次包含一个分组条件。在 SQL 中,我们可以通过向查询添加 GROUP BY 子句来实现这一点。

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_extendedprice),
    min(l_extendedprice),
    max(l_extendedprice),
    avg(l_extendedprice)
FROM lineitem
GROUP BY
    l_returnflag,
    l_linestatus;

在 Pandas 中,我们在执行聚合之前使用 groupby 函数。

lineitem.groupby(
  ['l_returnflag', 'l_linestatus']
).agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)
名称 时间 (秒)
DuckDB(1 线程) 0.43
DuckDB(2 线程)  0.32
Pandas 0.84

这个查询已经变得更加复杂,虽然 Pandas 表现尚可,但它比 DuckDB 的单线程版本慢两倍。DuckDB 拥有高度优化的聚合哈希表实现,可以在一次数据遍历中完成分组和所有聚合的计算。

带过滤条件的分组聚合

现在假设我们不想对所有数据执行聚合,而是只想选择数据的一个子集进行聚合。我们可以通过添加一个过滤子句来实现这一点,该子句会删除我们不感兴趣的任何元组。在 SQL 中,我们可以通过 WHERE 子句来完成此操作。

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_extendedprice),
    min(l_extendedprice),
    max(l_extendedprice),
    avg(l_extendedprice)
FROM lineitem
WHERE
    l_shipdate <= DATE '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;

在 Pandas 中,我们可以通过使用选择括号创建 DataFrame 的过滤变体。

# filter out the rows
filtered_df = lineitem[
  lineitem['l_shipdate'] < "1998-09-02"]
# perform the aggregate
result = filtered_df.groupby(
  ['l_returnflag', 'l_linestatus']
).agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)

在 DuckDB 中,查询优化器会将过滤和聚合合并为一次数据遍历,只读取相关列。然而,在 Pandas 中,我们没有这么幸运。过滤器在执行时实际上会子集化整个 lineitem 表,包括我们未使用的任何列!因此,过滤操作比其应有的耗时得多。

我们可以手动执行此优化(在数据库文献中称为“投影下推”)。为此,我们首先需要只选择与查询相关的列,然后对 lineitem DataFrame 进行子集化。我们将得到以下代码片段:

# projection pushdown
pushed_down_df = lineitem[
  ['l_shipdate',
   'l_returnflag',
   'l_linestatus',
   'l_extendedprice']
]
# perform the filter
filtered_df = pushed_down_df[
  pushed_down_df['l_shipdate'] < "1998-09-02"]
# perform the aggregate
result = filtered_df.groupby(
  ['l_returnflag', 'l_linestatus']
).agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)
名称 时间 (秒)
DuckDB(1 线程) 0.60
DuckDB(2 线程) 0.42
Pandas 3.57
Pandas(手动下推)   2.23

尽管手动投影下推显著加快了 Pandas 中的查询速度,但对于过滤后的聚合,仍然存在显著的时间开销。为了处理过滤器,Pandas 会将整个 DataFrame(减去已过滤掉的行)的副本写回内存。当过滤器选择性不高时,此操作可能会非常耗时。

由于其全面的查询优化器和高效的查询处理器,DuckDB 在此查询上的性能显著更优。

连接

对于最后一个查询,我们将连接(Pandas 中为 merge)lineitem 表与 orders 表,并应用一个过滤器,只选择我们感兴趣状态的订单。这在 SQL 中导致以下查询:

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_extendedprice),
    min(l_extendedprice),
    max(l_extendedprice),
    avg(l_extendedprice)
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderstatus='O'
GROUP BY
    l_returnflag,
    l_linestatus;

对于 Pandas,我们必须添加一个 merge 步骤。在基本方法中,我们将 lineitem 和 orders 合并在一起,然后应用过滤器,最后应用分组和聚合。这将得到以下代码片段:

# perform the join
merged = lineitem.merge(
  orders,
  left_on='l_orderkey',
  right_on='o_orderkey')
# filter out the rows
filtered_a = merged[
  merged['l_shipdate'] < "1998-09-02"]
filtered_b = filtered_a[
  filtered_a['o_orderstatus'] == "O"]
# perform the aggregate
result = filtered_b.groupby(
  ['l_returnflag', 'l_linestatus']
).agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)

现在我们错过了两个性能优化机会:

  • 首先,我们合并了过多的列,因为我们合并了查询其余部分不需要的列(投影下推)。
  • 其次,我们合并了过多的行。我们可以在合并之前应用过滤器,以减少需要合并的数据量(过滤下推)。

手动应用这两个优化后得到以下代码片段:

# projection & filter on lineitem table
lineitem_projected = lineitem[
  ['l_shipdate',
   'l_orderkey',
   'l_linestatus',
   'l_returnflag',
   'l_extendedprice']
]
lineitem_filtered = lineitem_projected[
  lineitem_projected['l_shipdate'] < "1998-09-02"]
# projection and filter on order table
orders_projected = orders[
  ['o_orderkey',
   'o_orderstatus']
]
orders_filtered = orders_projected[
  orders_projected['o_orderstatus'] == 'O']
# perform the join
merged = lineitem_filtered.merge(
  orders_filtered,
  left_on='l_orderkey',
  right_on='o_orderkey')
# perform the aggregate
result = merged.groupby(
  ['l_returnflag', 'l_linestatus']
).agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)

DuckDB 的查询优化器会自动应用这两个优化。

名称 时间 (秒)
DuckDB(1 线程) 1.05
DuckDB(2 线程) 0.53
Pandas 15.2
Pandas(手动下推) 3.78

我们看到,与优化版本相比,基本方法非常耗时。这证明了自动查询优化器的实用性。即使经过优化,Pandas 代码仍然比 DuckDB 慢得多,因为它在单独的过滤和连接操作后将中间结果存储在内存中。

要点总结

使用 DuckDB,您可以利用强大且富有表现力的 SQL 语言,而无需担心数据在 Pandas 内外移动的问题。DuckDB 安装极其简单,并提供许多优势,例如查询优化器、自动多线程和超内存计算能力。DuckDB 使用 Postgres SQL 解析器,并提供许多与 Postgres 相同的 SQL 功能,包括高级功能,例如窗口函数、关联子查询、(递归)公共表表达式、嵌套类型和采样。如果您缺少某个功能,请提出 issue

附录 A:数据来回传输:将数据从 Pandas 传输到 SQL 引擎再传回

传统 SQL 引擎使用客户端-服务器范式,这意味着客户端程序通过套接字连接到服务器。查询在服务器上运行,结果随后发送回客户端。例如,从 Python 使用 Postgres 时也是如此。不幸的是,这种传输是一个严重的瓶颈。像 SQLite 或 DuckDB 这样的进程内引擎则没有这个问题。

为了展示通过套接字进行数据传输的成本有多高,我们进行了一项涉及 Postgres、SQLite 和 DuckDB 的基准测试。基准测试的源代码可在 GitHub 上找到。

在此基准测试中,我们将一个(相当小的)由 10M 个 4 字节整数(40 MB)组成的 Pandas 数据帧从 Python 复制到 PostgreSQL、SQLite 和 DuckDB 数据库。由于默认的 Pandas to_sql 相当慢,我们添加了一个单独的优化:我们指示 Pandas 将数据帧写入临时 CSV 文件,然后指示 PostgreSQL 直接将该文件中的数据复制到新创建的表中。当然,这仅在数据库服务器与 Python 在同一台机器上运行时才有效。

名称 时间 (秒)
使用 to_sql 将 Pandas 数据导入 Postgres 111.25
使用临时 CSV 文件将 Pandas 数据导入 Postgres   5.57
使用 to_sql 将 Pandas 数据导入 SQLite 6.80
Pandas 数据导入 DuckDB 0.03

虽然 SQLite 在这里表现显著优于 Postgres,但它仍然相当慢。这是因为 Pandas 中的 to_sql 函数会运行大量的 INSERT INTO 语句,这涉及将 Pandas DataFrame 的所有单个值转换为 Python 对象的行式表示,然后传递给系统。另一方面,DuckDB 直接从 Pandas 读取底层数组,这使得此操作几乎是即时的。

将查询结果或表从 SQL 系统传回 Pandas 是另一个潜在的瓶颈。使用内置的 read_sql_query 速度极慢,但即使是经过优化的 CSV 路径,对于这个微小的数据集也至少需要一秒钟。另一方面,DuckDB 也能几乎即时地执行这种转换。

名称 时间 (秒)
使用 read_sql_query 将 PostgreSQL 数据导入 Pandas 7.08
使用临时 CSV 文件将 PostgreSQL 数据导入 Pandas 1.29
使用 read_sql_query 将 SQLite 数据导入 Pandas 5.20
DuckDB 数据导入 Pandas 0.04

附录 B:与 PandaSQL 的比较

有一个名为 PandaSQL 的包,它也提供了直接在 Pandas 之上运行 SQL 的功能。然而,它是使用我们在附录 A 中看到的 to_sqlfrom_sql 基础设施构建的,而这被证明是极其缓慢的。

尽管如此,为了公平起见,我们还是在 PandaSQL 中运行了第一个未分组聚合查询以进行计时。然而,当我们第一次尝试在原始数据集上运行查询时,我们遇到了内存不足错误,导致我们的 Colab 会话崩溃。因此,我们决定使用原始数据集大小的 10%(60 万行)的样本,再次为 PandaSQL 运行基准测试。结果如下:

名称 时间 (秒)
DuckDB(1 线程) 0.023
DuckDB(2 线程)  0.014
Pandas 0.017
PandaSQL 24.43

我们可以看到,在这个简单的基准测试中,PandaSQL(由 SQLite 提供支持)比 Pandas 或 DuckDB 慢约 1000 倍。性能差异如此之大,以至于我们选择不再为 PandaSQL 运行其他基准测试。

附录 C:直接在 Parquet 文件上查询

在上述基准测试中,我们完全将 Parquet 文件读取到 Pandas 中。然而,DuckDB 也能够直接在 Parquet 文件上运行查询(并行!)。在本附录中,我们将展示这种方式与首先将文件加载到 Python 中的性能比较。

为了进行基准测试,我们将运行两个查询:最简单的查询(未分组聚合)和最复杂的查询(最终连接),并比较直接在 Parquet 文件上运行此查询与使用 read_parquet 函数将其加载到 Pandas 中的成本。

设置

在 DuckDB 中,我们可以使用以下查询在 Parquet 文件上创建一个视图。这使我们能够像在普通表上一样对 Parquet 文件运行查询。请注意,我们根本不需要担心投影下推:我们可以直接使用 SELECT *,DuckDB 的优化器会在查询时自动处理只投影所需的列。

CREATE VIEW lineitem_parquet AS
    SELECT * FROM 'lineitemsf1.snappy.parquet';
CREATE VIEW orders_parquet AS
    SELECT * FROM 'orders.parquet';

未分组聚合

设置好此视图后,我们可以运行之前运行过的相同查询,但这次是针对 lineitem_parquet 表。

SELECT sum(l_extendedprice), min(l_extendedprice), max(l_extendedprice), avg(l_extendedprice) FROM lineitem_parquet;

对于 Pandas,我们首先需要运行 read_parquet 将数据加载到 Pandas 中。为此,我们使用由 Apache Arrow 提供支持的 Parquet 读取器。之后,我们可以像之前一样运行查询。

lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet')
result = lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))

然而,我们现在再次遇到 Pandas 将完整读取 Parquet 文件的问题。为了解决这个问题,我们需要再次手动执行投影下推,通过向 read_parquet 方法提供我们想要读取的列集。

DuckDB 中的优化器会通过查看您正在执行的查询自行解决这个问题。

lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet', columns=['l_extendedprice'])
result = lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
名称 时间 (秒)
DuckDB(1 线程) 0.16
DuckDB(2 线程) 0.14
Pandas 7.87
Pandas(手动下推) 0.17

我们可以看到,执行下推与不执行下推之间的性能差异是巨大的。当我们执行下推时,Pandas 的性能与 DuckDB 处于同一水平。然而,如果没有下推,它会从磁盘加载整个文件,包括回答查询不需要的其他 15 列。

连接

现在来看我们之前在连接部分看到的最后一个查询。回顾一下:

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_extendedprice),
    min(l_extendedprice),
    max(l_extendedprice),
    avg(l_extendedprice)
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderstatus='O'
GROUP BY
    l_returnflag,
    l_linestatus;

对于 Pandas,我们再次创建了两个版本。一个朴素版本,以及一个手动优化版本。所使用的确切代码可以在 Google Colab 中找到。

名称 时间 (秒)
DuckDB(1 线程) 1.04
DuckDB(2 线程) 0.89
Pandas 20.4
Pandas(手动下推) 3.95

我们看到,对于这个更复杂的查询,在 Pandas DataFrame 和 Parquet 文件上运行的性能微小差异消失了,DuckDB 的计时变得与我们之前看到的计时非常相似。额外的 Parquet 读取再次增加了手动对 Pandas 代码进行优化的必要性,而这在 DuckDB 中运行 SQL 时根本不需要。