DuckDB 中的 Arrow IPC 支持
简而言之:DuckDB 现在通过 arrow
社区扩展支持消费和生成 Arrow IPC 序列化格式。
DuckDB 和 Arrow
近年来,Apache Arrow 项目在数据领域获得了广泛关注,这要归功于其列式格式,该格式允许在不同系统之间轻松交换数据——主要采用零拷贝方法。Apache Arrow 支持 DuckDB 和 Polars 之间的集成等。实际上,当 DuckDB 生成或使用 Polars DataFrame 时,它实际上是在使用底层的 Arrow 列式格式。
拥有这种格式的重要性也是 DuckDB 成为 与 Arrow 集成并实现 Arrow 数据库连接 (ADBC) 接口的先驱的主要原因之一——特别是由于 Arrow 使之成为可能,而无需额外的依赖项,这要归功于它的 C 数据接口。
但 Arrow 的 C 数据接口的一个限制是它使用指针(内存地址)交换数据。如果您想在不同的进程或系统之间交换 Arrow 数据,这会限制可能性。为了克服这个限制,Arrow 项目还指定了 Arrow IPC 格式,该格式允许用户有效地序列化 Arrow 列式数据,并在进程之间或通过网络传递它。此数据可以作为流使用,可以直接从内存缓冲区或从文件中使用。
我们很高兴地宣布,DuckDB 现在能够通过新的 arrow
社区扩展使用和生成这些 Arrow 流。在这篇文章中,我们将更详细地描述 Arrow IPC 序列化格式,展示如何安装新的 DuckDB arrow
社区扩展,并提供一个演示,展示如何使用它。
Arrow 进程间通信 (Arrow IPC)
Arrow IPC 格式提供了一种序列化(以及可选的压缩)Arrow 格式数据的方式,使您能够通过网络传输数据或将其存储在磁盘上,同时将其保留为 Arrow 格式,从而避免了将其转换为其他格式的开销。Arrow IPC 支持 LZ4 和 ZSTD 压缩,并且当作为文件存储时,它还支持文件页脚,该页脚可用于通过允许跳过部分数据来加速检索和处理(类似于 Parquet 格式使用的方法)。与 Parquet 相比,Arrow IPC 格式有两个主要优点
- 易于实现:编写一个低级 Arrow IPC 消费者/生产者比编写一个 Parquet 消费者/生产者更简单,特别是如果系统已经与 Arrow 格式集成。
- 更快的编码和解码:编码和解码(序列化和反序列化)Arrow 数据的过程比 Parquet 简单得多且速度更快。这可以缩短处理时间——特别是如果您要流式传输不需要随后存储在磁盘上的数据。
Arrow 和 Parquet 是互补的格式。Parquet 复杂的编码和压缩选项系统通常会产生更小的文件,这使得 Parquet 成为存档存储的更好选择。Arrow 消除编码和解码开销的能力通常会产生更快、更高效的数据交换,这使得 Arrow 成为查询结果传输和临时缓存的更好选择。
为了向您展示 Arrow IPC 格式有多么简单,请考虑以下说明。在 Arrow IPC 格式中,一个表被序列化为一系列记录批次(以列式布局组织的记录集合),并在其前面加上它们共享的模式
图来自 Apache Arrow Blog:Apache Arrow 格式如何加速查询结果传输。
请注意,在实际情况下,记录批次要大得多,并且上面的图为了说明目的而进行了简化。
Arrow 社区扩展
多年来,DuckDB 一直包含与 Arrow IPC 格式的集成,通过 (现在已弃用的)Arrow 核心扩展。但是,此支持的主要目的是允许 DuckDB 与 JavaScript 互操作,因此它仅设计为读取内存中的序列化缓冲区,而不是 Arrow IPC 文件。扩展的代码复杂性和可维护性非常高,因为使用 Arrow IPC 需要将整个 Arrow C++ 库作为依赖项,因为我们不想为 Arrow IPC 格式编写自己的序列化和反序列化代码。
最近,一种更小的 Arrow C++ 实现开始流行,成为与 Arrow IPC 数据交互的一种方式:nanoarrow 库。使用 nanoarrow,我们完全重新设计了旧的 DuckDB Arrow 扩展,使其具有更小的依赖项、更简洁的代码库以及扫描 Arrow IPC 文件的能力。我们还借此机会将 Arrow DuckDB 扩展从核心扩展转移到社区扩展。做出此更改有两个主要原因。首先是使 Arrow 开发人员和用户社区能够更多地参与构建和支持扩展。第二个是拥有一个不与 DuckDB 发布计划相关的发布计划。实际上,这意味着核心 Arrow 开发人员社区的成员可以决定何时发布新版本的扩展。
安装和加载新的 Arrow 扩展非常简单
INSTALL arrow FROM community;
LOAD arrow;
演示
在此演示中,我们将使用新的 Arrow DuckDB 扩展生成比例因子为 10 的 lineitem
TPC-H 表作为 Arrow IPC 文件。虽然我们的演示将侧重于存储在文件中的 Arrow IPC 数据,但该扩展本身还允许您直接使用和生成作为缓冲区的 Arrow IPC 格式。您可以在扩展的 README 中找到使用和接受参数的详细示例。
我们首先加载 arrow
扩展并生成我们的 TPC-H 表。
LOAD arrow;
CALL dbgen(sf = 10);
要生成 Arrow IPC 文件,我们可以简单地使用 COPY ... TO ...
子句,如下所示。我们使用 推荐的文件扩展名 .arrows
,因为此文件采用 Arrow IPC 流格式。
COPY lineitem TO 'lineitem.arrows';
在此演示中,为简单起见,我们将表格写入单个文件。但是,我们的 Arrow
COPY ... TO ...
子句允许我们设置chunk_size
和每个文件的row_groups
数量。这些选项允许我们生成针对您的用例实现最佳性能优化的数据。例如,较小的chunk_size
可能会降低总体性能,但有利于流式传输场景。
我们现在可以使用 read_arrow
函数直接在我们的文件上运行 TPC-H 查询 6
SELECT
sum(l_extendedprice * l_discount) AS revenue
FROM
read_arrow('lineitem.arrows')
WHERE
l_shipdate >= CAST('1994-01-01' AS date)
AND l_shipdate < CAST('1995-01-01' AS date)
AND l_discount BETWEEN 0.05
AND 0.07
AND l_quantity < 24;
它会打印
┌─────────────────┐
│ revenue │
│ decimal(38,4) │
├─────────────────┤
│ 1230113636.0101 │
│ (1.23 billion) │
└─────────────────┘
感谢 替换扫描,如果文件名以 .arrow
或 .arrows
结尾,您可以省略函数 read_arrow
。例如
SELECT count(*) FROM 'lineitem.arrows';
它会打印
┌─────────────────┐
│ count_star() │
│ int64 │
├─────────────────┤
│ 59986052 │
│ (59.99 million) │
└─────────────────┘
为简单起见,我们专注于读取单个文件的场景,但我们的阅读器支持多文件读取,其功能与 DuckDB Parquet 阅读器相当。
如果您想直接从服务器将 Arrow IPC 流提取到 DuckDB 中怎么办?为了演示这一点,我们可以在保存 lineitem.arrows
的同一目录中启动一个 HTTP 文件服务器。我们使用 Node.js 包 serve 来执行此操作(而不是 Python 的内置 http.server
),因为它支持 HTTP 范围请求
npx serve -l 8008
然后,您可以使用 DuckDB 的 httpfs
扩展通过 HTTP(S) 协议查询 Arrow 数据
INSTALL httpfs;
LOAD httpfs;
LOAD arrow;
SELECT count(*) FROM read_arrow('https://:8008/lineitem.arrows');
它会打印相同的结果
┌─────────────────┐
│ count_star() │
│ int64 │
├─────────────────┤
│ 59986052 │
│ (59.99 million) │
└─────────────────┘
或者,您可以使用像 curl
这样的工具从服务器获取 Arrow IPC 数据,并通过管道将其传输到终端中的 DuckDB
URL="https://:8008/lineitem.arrows"
SQL="LOAD arrow; FROM read_arrow('/dev/stdin') SELECT count(*);"
curl -s "$URL" | duckdb -c "$SQL"
它会打印相同的结果。有关 arrow
扩展的其他演示,请参阅我们的 arrow-ipc 演示存储库。
奖励:Arrow IPC 的酷炫用例
像上面演示中那样对 Arrow IPC 格式的数据运行 DuckDB 查询是一个非常巧妙的技巧,它效果很好,因为 DuckDB 和 Arrow 都是天然的一对,因为它们都使用列式数据布局。但是,您可能想知道 Arrow IPC 数据还可以做什么。Arrow 的主要目标之一是互操作性,通过以 Arrow IPC 格式保存我们的数据,我们为与其他工具的连接打开了许多选项。
例如,我们现在可以使用 PyArrow 来处理我们的数据
import pyarrow as pa
with open('lineitem.arrows', 'rb') as source:
stream = pa.ipc.open_stream(source)
tbl = stream.read_all()
或 Polars
import polars as pl
tbl = pl.read_ipc_stream("lineitem.arrows")
CREATE TABLE
lineitem
ENGINE MergeTree()
ORDER BY tuple()
AS
SELECT * FROM file('lineitem.arrows', 'ArrowStream');
或任何其他众多 Arrow 库(有十几种不同的语言版本)或 Arrow 兼容系统。
Arrow IPC 的优势还不止于此:Arrow IPC 也非常适合大于内存的用例。使用 PyArrow,我们可以 内存映射我们的 lineitem.arrows
文件,并在不将整个文件读入内存的情况下使用它
import pyarrow as pa
with pa.memory_map('lineitem.arrows', 'rb') as source:
stream = pa.ipc.open_stream(source)
tbl = stream.read_all()
tbl.num_rows
# => 59986052
然后,我们可以检查 PyArrow 是否不必分配任何缓冲区来保存数据,因为它都位于磁盘上
pa.total_allocated_bytes()
# => 0
现在我们可以执行与上面演示中相同的查询,并显示我们得到相同的结果
import datetime
import pyarrow.compute as pc
subset = tbl.filter(
(pc.field("l_shipdate") >= datetime.datetime(1994, 1, 1)) &
(pc.field("l_shipdate") < datetime.datetime(1995, 1, 1)) &
(pc.field("l_discount") >= 0.05) &
(pc.field("l_discount") <= 0.07) &
(pc.field("l_quantity") < 24.)
)
pc.sum(pc.multiply(subset.column("l_extendedprice"), subset.column("l_discount")))
# => <pyarrow.Decimal128Scalar: Decimal('1230113636.0101')>
而且,尽管 lineitem.arrows
超过 10 GB,但 PyArrow 只需分配一小部分内存
pa.total_allocated_bytes()
# => 201594240 (192MB)
结论 & 未来计划
在这篇博文中,我们介绍了新的 Arrow 社区扩展,该扩展使 DuckDB 用户能够与 Arrow IPC 流缓冲区和文件进行交互。特别感谢 Voltron Data 通过与 DuckDB Labs 合作来实现此扩展。下面我们列出了此扩展的未来计划
- 编写 Arrow IPC 时支持
ZSTD
和LZ4
压缩。DuckDB 目前仅支持写入未压缩的缓冲区。 - 读取 Arrow 缓冲区时支持
LZ4
压缩。读取器目前仅支持ZSTD
或未压缩的缓冲区。 - 支持编写包含文件页脚的 Arrow IPC 文件格式,并使用页脚来加速读取。
- 实施 C API DuckDB 函数以生成和使用 Arrow IPC 数据。
如果您想从事这些计划中的任何功能或提出其他功能,或者如果您发现任何错误,请随时在我们的 问题跟踪器中记录它们。祝您黑客愉快!