使用 DuckDB 精确查询 Parquet 数据
概要:DuckDB 是一个免费开源的分析数据管理系统,可以直接在 Parquet 文件上运行 SQL 查询,并自动利用 Parquet 格式的高级特性。
Apache Parquet 是最常见的“大数据”分析存储格式。在 Parquet 文件中,数据以列式压缩的二进制格式存储。每个 Parquet 文件存储一个表。该表被分成行组,每个行组包含该表行的子集。在行组内,表数据以列式方式存储。
Parquet 格式具有许多使其适合分析用例的属性
- 列式表示意味着可以(高效地)读取单个列。无需总是读取整个文件!
- 该文件在每个行组中包含每列的统计信息(最小值/最大值,以及
NULL
值的数量)。这些统计信息允许读取器跳过不需要的行组。 - 列式压缩显著减小了格式的文件大小,从而减少了数据集的存储要求。这通常可以将大数据转化为中等数据。
DuckDB 和 Parquet
DuckDB 的零依赖 Parquet 读取器能够直接在 Parquet 文件上执行 SQL 查询,而无需任何导入或分析步骤。由于 Parquet 的自然列式格式,这非常快!
DuckDB 将以流式方式读取 Parquet 文件,这意味着您可以对不适合主内存的大型 Parquet 文件执行查询。
DuckDB 能够自动检测任何给定查询所需的列和行。这允许用户分析更大更复杂的 Parquet 文件,而无需执行手动优化或投资更多硬件。
此外,DuckDB 能够使用并行处理,并同时处理多个 Parquet 文件(使用 glob 语法)来完成所有这些工作。
作为一个简短的预告,这是一个代码片段,允许您直接在 Parquet 文件之上运行 SQL 查询。
安装 DuckDB 包:
pip install duckdb
下载 Parquet 文件:
wget https://blobs.duckdb.org/data/taxi_2019_04.parquet
然后,运行以下 Python 脚本:
import duckdb
print(duckdb.query('''
SELECT count(*)
FROM 'taxi_2019_04.parquet'
WHERE pickup_at BETWEEN '2019-04-15' AND '2019-04-20'
''').fetchall())
自动过滤器和投影下推
让我们深入研究之前的查询,以更好地理解 Parquet 格式与 DuckDB 查询优化器结合使用时的强大功能。
SELECT count(*)
FROM 'taxi_2019_04.parquet'
WHERE pickup_at BETWEEN '2019-04-15' AND '2019-04-20';
在此查询中,我们从 Parquet 文件中读取单个列 (pickup_at
)。可以完全跳过存储在 Parquet 文件中的任何其他列,因为我们不需要它们来回答我们的查询。
此外,只有 pickup_at
在 2019 年 4 月 15 日至 20 日之间的行才会影响查询结果。可以跳过任何不满足此谓词的行。
我们可以充分利用 Parquet 文件中的统计信息。可以跳过任何 pickup_at
最大值低于 2019-04-15
或最小值高于 2019-04-20
的行组。在某些情况下,这允许我们跳过读取整个文件。
DuckDB 与 Pandas
为了说明这些自动优化的有效性,我们将使用 Pandas 和 DuckDB 在 Parquet 文件之上运行一些查询。
在这些查询中,我们使用存储为 Parquet 文件的臭名昭著的纽约出租车数据集的一部分,特别是 2019 年 4 月、5 月和 6 月的数据。这些文件的大小约为 360 MB,包含大约 2100 万行和 18 列。这三个文件放在 taxi/
文件夹中。
这些示例可在 Google Colab 上的交互式 notebook 中获得。此处报告的计时来自此环境,以实现可重复性。
读取多个 Parquet 文件
首先,我们看一下数据集中的一些行。taxi/
文件夹中有三个 Parquet 文件。DuckDB 支持 globbing 语法,这允许它同时查询所有三个文件。
con.execute("""
SELECT *
FROM 'taxi/*.parquet'
LIMIT 5""").df()
pickup_at | dropoff_at | 乘客数量 | trip_distance | rate_code_id |
---|---|---|---|---|
2019-04-01 00:04:09 | 2019-04-01 00:06:35 | 1 | 0.5 | 1 |
2019-04-01 00:22:45 | 2019-04-01 00:25:43 | 1 | 0.7 | 1 |
2019-04-01 00:39:48 | 2019-04-01 01:19:39 | 1 | 10.9 | 1 |
2019-04-01 00:35:32 | 2019-04-01 00:37:11 | 1 | 0.2 | 1 |
2019-04-01 00:44:05 | 2019-04-01 00:57:58 | 1 | 4.8 | 1 |
尽管该查询从三个(相当大的)Parquet 文件中选择了所有列,但该查询立即完成。这是因为 DuckDB 以流式方式处理 Parquet 文件,并且在读取前几行后将停止读取 Parquet 文件,因为这是满足查询所需的全部内容。
如果我们尝试在 Pandas 中执行相同的操作,我们会意识到它并不那么简单,因为 Pandas 无法在一个调用中读取多个 Parquet 文件。我们首先必须使用 pandas.concat
将三个 Parquet 文件连接在一起。
import pandas
import glob
df = pandas.concat(
[pandas.read_parquet(file)
for file
in glob.glob('taxi/*.parquet')])
print(df.head(5))
以下是这两个查询的计时。
系统 | 时间 (秒) |
---|---|
DuckDB | 0.015 |
Pandas | 12.300 |
Pandas 需要更长的时间才能完成此查询。这是因为 Pandas 不仅需要完整读取三个 Parquet 文件中的每一个,而且还必须将这三个单独的 Pandas DataFrame 连接在一起。
连接到单个文件
我们可以通过从三个较小的部分创建一个大的 Parquet 文件来解决连接问题。我们可以使用 pyarrow
库来实现这一点,该库支持读取多个 Parquet 文件并将它们流式传输到单个大文件中。请注意,pyarrow
parquet 读取器与 Pandas 内部使用的 parquet 读取器完全相同。
import pyarrow.parquet as pq
# concatenate all three parquet files
pq.write_table(pq.ParquetDataset('taxi/').read(), 'alltaxi.parquet', row_group_size=100000)
请注意,DuckDB 还支持使用 COPY 语句写入 Parquet 文件。
查询大型文件
现在让我们重复之前的实验,但使用单个文件代替。
# DuckDB
con.execute("""
SELECT *
FROM 'alltaxi.parquet'
LIMIT 5""").df()
# Pandas
pandas.read_parquet('alltaxi.parquet')
.head(5)
系统 | 时间 (秒) |
---|---|
DuckDB | 0.02 |
Pandas | 7.50 |
我们可以看到 Pandas 的性能比以前更好,因为避免了连接。但是,仍然需要将整个文件读入内存,这既需要大量的时间,也需要大量的内存。
对于 DuckDB 来说,查询中需要读取多少个 Parquet 文件并不重要。
计数行数
现在假设我们想知道数据集中有多少行。我们可以使用以下代码来实现:
# DuckDB
con.execute("""
SELECT count(*)
FROM 'alltaxi.parquet'
""").df()
# Pandas
len(pandas.read_parquet('alltaxi.parquet'))
系统 | 时间 (秒) |
---|---|
DuckDB | 0.015 |
Pandas | 7.500 |
DuckDB 很快完成了查询,因为它自动识别需要从 Parquet 文件中读取的内容,并最大限度地减少了所需的读取次数。Pandas 必须再次读取整个文件,这导致它花费与上一个查询相同的时间。
对于此查询,我们可以通过手动优化来改进 Pandas 的时间。为了获得计数,我们只需要文件中的单个列。通过手动指定要读取的单个列到 read_parquet
命令中,我们可以获得相同的结果,但速度更快。
len(pandas.read_parquet('alltaxi.parquet', columns=['vendor_id']))
系统 | 时间 (秒) |
---|---|
DuckDB | 0.015 |
Pandas | 7.500 |
Pandas(优化) | 1.200 |
虽然这快得多,但这仍然需要一秒多钟,因为必须将整个 vendor_id
列作为 Pandas 列读入内存,才能计算行数。
过滤行
通常使用某种过滤谓词来仅查看数据集的有趣部分。例如,假设我们想知道 2019 年 6 月 30 日之后发生了多少次出租车行程。我们可以使用 DuckDB 中的以下查询来实现:
con.execute("""
SELECT count(*)
FROM 'alltaxi.parquet'
WHERE pickup_at > '2019-06-30'
""").df()
查询在 45ms
内完成,并产生以下结果:
count |
---|
167022 |
在 Pandas 中,我们可以使用一种天真的方法来执行相同的操作。
# pandas naive
len(pandas.read_parquet('alltaxi.parquet')
.query("pickup_at > '2019-06-30'"))
这再次将整个文件读入内存,导致此查询花费 7.5 秒。通过手动投影下推,我们可以将其降至 0.9 秒。仍然远高于 DuckDB。
# pandas projection pushdown
len(pandas.read_parquet('alltaxi.parquet', columns=['pickup_at'])
.query("pickup_at > '2019-06-30'"))
然而,pyarrow
parquet 读取器还允许我们将过滤器下推到扫描中。一旦我们添加了这个,我们最终会得到一个更具竞争力的 70ms
来完成查询。
len(pandas.read_parquet('alltaxi.parquet', columns=['pickup_at'], filters=[('pickup_at', '>', '2019-06-30')]))
系统 | 时间 (秒) |
---|---|
DuckDB | 0.05 |
Pandas | 7.50 |
Pandas(投影下推) | 0.90 |
Pandas(投影和过滤器下推) | 0.07 |
这表明此处的结果不是由于 DuckDB 的 parquet 读取器比 pyarrow
Parquet 读取器更快。DuckDB 在这些查询上表现更好的原因是,它的优化器会自动从 SQL 查询中提取所有必需的列和过滤器,然后在 Parquet 读取器中自动使用它们,而无需手动操作。
有趣的是,pyarrow
Parquet 读取器和 DuckDB 都比在物化的 DataFrame 上以 Pandas 原生方式执行此操作快得多。
# read the entire parquet file into Pandas
df = pandas.read_parquet('alltaxi.parquet')
# run the query natively in Pandas
# note: we only time this part
print(len(df[['pickup_at']].query("pickup_at > '2019-06-30'")))
系统 | 时间 (秒) |
---|---|
DuckDB | 0.05 |
Pandas | 7.50 |
Pandas(投影下推) | 0.90 |
Pandas(投影和过滤器下推) | 0.07 |
Pandas(原生) | 0.26 |
聚合
最后,让我们看一个更复杂的聚合。假设我们要计算每位乘客的行程数。使用 DuckDB 和 SQL,它看起来像这样:
con.execute("""
SELECT passenger_count, count(*)
FROM 'alltaxi.parquet'
GROUP BY passenger_count""").df()
查询在 220ms
内完成,并产生以下结果:
乘客数量 | count |
---|---|
0 | 408742 |
1 | 15356631 |
2 | 3332927 |
3 | 944833 |
4 | 439066 |
5 | 910516 |
6 | 546467 |
7 | 106 |
8 | 72 |
9 | 64 |
对于不熟悉 SQL 的人,以及作为未来博客文章的预告,DuckDB 还有一个“关系 API”,它允许以更 Python 风格的方式声明查询。这是与上面的 SQL 查询等效的,它提供了完全相同的结果和性能:
con.from_parquet('alltaxi.parquet')
.aggregate('passenger_count, count(*)')
.df()
现在作为比较,让我们以之前相同的方式在 Pandas 中运行相同的查询。
# naive
pandas.read_parquet('alltaxi.parquet')
.groupby('passenger_count')
.agg({'passenger_count' : 'count'})
# projection pushdown
pandas.read_parquet('alltaxi.parquet', columns=['passenger_count'])
.groupby('passenger_count')
.agg({'passenger_count' : 'count'})
# native (parquet file pre-loaded into memory)
df.groupby('passenger_count')
.agg({'passenger_count' : 'count'})
系统 | 时间 (秒) |
---|---|
DuckDB | 0.22 |
Pandas | 7.50 |
Pandas(投影下推) | 0.58 |
Pandas(原生) | 0.51 |
我们可以看到 DuckDB 在所有三种情况下都比 Pandas 快,而无需执行任何手动优化,也无需将 Parquet 文件完整地加载到内存中。
结论
DuckDB 可以直接在 Parquet 文件之上高效地运行查询,而无需初始加载阶段。该系统将自动利用 Parquet 的所有高级特性来加速查询执行。
DuckDB 是一个免费开源的数据库管理系统(MIT 许可)。它的目标是成为分析的 SQLite,并提供一个快速高效的数据库系统,没有外部依赖项。它不仅适用于 Python,还适用于 C/C++、R、Java 等。