duckplyr:由 DuckDB 驱动的 dplyr
太长不看:新的 R 软件包 duckplyr 将 dplyr API 转换为 DuckDB 的执行引擎。
有关 duckplyr 文档,请访问
duckplyr.tidyverse.org
。
背景
将表格数据整理成适合分析的形式可能是一项具有挑战性的任务。不知何故,每个数据集的创建方式都不同。数据集之间的差异存在于信息在行和列中的逻辑组织,或更具体的选择中,例如日期、货币、分类值、缺失数据等的表示方式。对于诸如小数分隔符使用何种字符之类的琐碎问题缺乏全球共识,也并未简化这项任务。为了获得新的见解,我们通常还需要合并来自多个来源的信息,例如通过使用通用标识符连接两个数据集。然而,有一些常见重复操作,在为分析重塑数据方面被发现是普遍有用的。例如,结构化(英语)查询语言,简称 SQL(“See-Quel”),描述了一组可应用于表格数据的常见操作,如选择、投影、连接、聚合、排序、窗口化等。SQL 被证明取得了巨大成功,尽管存在诸多缺点和多次尝试取而代之,它仍然是事实上的数据转换语言,背后有一个庞大的产业。
library("DBI")
con <- dbConnect(...)
df <- dbGetQuery(con, "SELECT something, very, complicated FROM some_table JOIN another_table BY (some_shared_attribute) GROUP BY group_one, group_two ORDER BY some_column, and_another_column;")
一种不太便捷地将数据导入 R 的方式
对于在 R 或 Python 等交互式编程环境(可能在 RStudio 或 Jupyter Notebooks 等 IDE 中)中的数据分析师而言,使用 SQL 重塑数据从未真正成为自然的选择。当然,有时需要使用 SQL 从操作性系统拉取数据,如上所示,但如果有选择,分析师更倾向于使用这些语言提供的更符合人体工程学的数据重塑功能。R 从一开始就将数据整理作为语言的一部分,通过 data.frame 类来表示表格数据。后来,在 2014 年,Hadley Wickham 定义了所谓 “整洁的”数据 的表格数据逻辑结构,并发布了 dplyr 包的第一个版本,该包旨在统一和简化以前笨拙的 R 命令,将数据重塑为单一、统一且一致的 API。在 Python 领域,广受欢迎的 Pandas 项目 扩展了 Python,提供了事实上的表格数据表示以及关系型操作符,尽管未尝试实现“整洁性”。
然而,在某个时候,R 和 Python 的数据处理功能开始在人们希望分析的日益增长的数据集重压下不堪重负。数据集迅速增长到数百万行。例如,早期需要特殊处理的数据集之一是美国社区调查数据集,因为美国人口众多。但是,Pandas 和 dplyr 等工具是为了方便而设计的,不一定是为了效率。例如,它们缺乏在现在常见的多核处理器上并行处理数据重塑任务的能力。
尽管当时有一整套新兴的“大数据”工具,但从交互式数据分析环境中使用它们被证明是一种糟糕的开发体验,例如由于数秒的作业启动时间以及远远超出大多数数据分析师技能范围的非常复杂的设置过程。然而,与此同时,关系型数据管理系统领域并未停滞不前。在通过 SQL 提高分析性数据分析效率方面取得了巨大进展:围绕列式数据表示、高效查询解释甚至编译以及自动高效并行化的创新将查询处理效率提高了几个数量级。遗憾的是,由于社区之间缺乏沟通以及创新被隔离到企业、商业和闭源产品中,这些创新未能进入数据分析师的工具包——即使数十年过去了。
摆脱这种不幸局面有两种可能的方法
- 通过总体效率提升、优化和并行化,提高 R 和 Python 的数据分析能力,使其能够处理更大的数据集;
- 以某种方式将现有最先进的技术集成到交互式数据分析环境中。
方法一的主要问题是,从头开始构建一个具有竞争力的分析查询引擎需要数百万美元的投入,以及一支高度专业化的查询引擎构建专家团队。其中有许多高度复杂的活动部件,都需要良好地协同工作。查询引擎中有些看似显而易见的问题,却需要攻读数据管理系统博士学位才能解决。在一个工具通常由志愿者利用业余时间构建并免费发布的空间中,收回如此巨大的投资是具有挑战性的。话虽如此,在这个领域也有一些值得称赞的项目,例如 data.table 或最近的 pola.rs,它们比旧工具提供了大幅提升的性能。
方法二也并非没有挑战:最先进的查询引擎技术往往隐藏在不兼容的架构背后。例如,数据管理系统运行在专用数据库服务器上,客户端应用程序使用客户端协议与该服务器交互的二层架构,与交互式分析相当不兼容。设置和维护一个单独的数据库“服务器”——即使在同一台计算机上——仍然很痛苦。在分析环境和数据库服务器之间来回移动数据已被证明相当昂贵。不幸的是,这些架构决策深刻影响着查询引擎的权衡,因此事后难以更改。
然而,在这个领域有所发展:DuckDB 的既定目标之一是通过其进程内(in-process)架构将最先进的分析数据管理技术从系统架构中解放出来。简单来说,这意味着没有独立的数据库服务器,DuckDB 而是在“宿主”进程内运行。这个宿主可以是任何需要数据管理能力的应用,或者仅仅是一个交互式数据分析环境,如 Python 或 R。在宿主环境中运行还有一个巨大优势:在宿主和 DuckDB 之间来回移动数据非常便宜。对于 R 和 Python,DuckDB 可以直接在分析环境中的数据帧上运行复杂查询,无需任何导入或转换步骤。反之,DuckDB 的查询结果可以直接转换为数据帧,大大减少了与用于绘图、进一步分析或机器学习的下游库集成的开销。DuckDB 能够高效执行任意复杂的关系型查询,包括递归查询和关联查询。DuckDB 能够处理大于内存的数据集,无论是在读取和写入时,还是在处理大型中间结果时,例如数百万个组的聚合结果。DuckDB 拥有一个精密的完整查询优化器,消除了以前常见的手动优化步骤。DuckDB 还提供持久性,表格数据存储在磁盘文件中。这些文件中的表也可以更改——同时保持事务完整性。这些在交互式数据分析中是闻所未闻的功能,它们是分析数据系统数十年研究和工程的成果。
然而,一个问题依然存在,DuckDB 使用 SQL 语言。尽管 SQL 是一种流行的语言,但并非所有分析师都希望用 SQL 来表达他们的数据转换。这里的主要问题之一是,查询通常在 R 或 Python 脚本中以字符串形式表达,并以不透明的方式发送到数据库系统。这意味着这些查询具有全有或全无的语义,并且调试问题可能具有挑战性(“您的 SQL 语法有错误;请查阅手册……”)。像 dplyr 这样的 API 对用户来说通常更方便,它们允许 IDE 支持函数、变量名等的自动补全。此外,dplyr API 的累加性质允许以小步构建一系列数据转换,与编写百行 SQL 查询相比,这大大降低了分析师的认知负荷。曾有一些早期实验性尝试重载 R 的原生数据帧 API 以映射到 SQL 数据库,但这些方法被发现通用性太差,令用户感到意外,并且通常过于脆弱。需要一种更好的方法。
duckplyr R 软件包
为了解决这些问题,我们与 Posit(前身为 RStudio)和 cynkra 的 dplyr 项目团队合作开发了 duckplyr。duckplyr 是 dplyr 的即插即用型替代品,由 DuckDB 提供性能支持。Duckplyr 在交互式分析领域实现了多项创新。首先,安装 duckplyr 就像安装 dplyr 一样简单。DuckDB 已作为独立 R 软件包打包,其中包含整个数据管理系统代码以及 R 的封装器。DuckDB R 软件包和 duckplyr 均可在 CRAN 上获得,使得在所有主要平台上的安装都变得简单
install.packages("duckplyr")
动词(操作函数)
在底层,duckplyr 将某种程度上的关系型dplyr 操作(“动词”)转换为 DuckDB 的关系查询处理引擎。除了命名上的一些混淆之外,dplyr 的动词(如 select、filter、summarise 等)与 DuckDB 的 project、filter 和 aggregate 操作符之间存在一种大多直接的映射。与以往方法的一个关键区别是,duckplyr 不通过 DuckDB 的 SQL 接口来创建查询计划。相反,duckplyr 使用 DuckDB 所谓的“关系型”API 直接构建逻辑查询计划。此 API 允许完全绕过 SQL 解析器,大大降低了困扰 dbplyr 等其他方法的运算符、标识符、常量和表名转义的难度。
我们已将 C++ 级别的关系型 API 暴露给 R,从而可以直接从 R 构建 DuckDB 查询计划。此低级 API 不应直接使用,但 duckplyr 利用它将 dplyr 动词转换为 DuckDB 关系型 API,进而转换为查询计划。下面是一个例子
library("duckplyr")
as_duckplyr_df(data.frame(n=1:10)) |>
mutate(m=n+1) |>
filter (m > 5) |>
count() |>
explain()
┌───────────────────────────┐
│ PROJECTION │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ n │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ UNGROUPED_AGGREGATE │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ count_star() │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ FILTER │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│(+(CAST(n AS DOUBLE), 1.0) │
│ > 5.0) │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 10 │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ R_DATAFRAME_SCAN │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ data.frame │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ n │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 10 │
└───────────────────────────┘
我们可以看到 dplyr 动词 mutate、filter 和 count 的序列是如何“神奇地”转换为由扫描、过滤、投影和聚合组成的 DuckDB 查询计划的。我们可以在最底部看到一个 R_DATAFRAME_SCAN
运算符被添加。这个运算符直接读取 R 数据帧,就像它是 DuckDB 中的一个表一样,无需实际数据导入。新动词 explain()
会导致 DuckDB 的逻辑查询计划被打印出来,这样我们就可以根据 duckplyr 的动词序列来预测 DuckDB 将要执行什么。
表达式
表达式是数据转换中经常被忽视但至关重要的组成部分。表达式(概念上)是对常量和数据列的标量转换,可用于例如生成派生列或将实际列值转换为布尔值以用于过滤器。例如,人们可以编写一个像 (amount - discount) * tax
这样的表达式来计算实际开票金额,而无需该金额实际存储在列中,或者在过滤表达式中使用像 value > 42
这样的表达式来删除所有值小于或等于 42
的行。Dplyr 依赖基础 R 引擎评估表达式,并进行一些小修改以将变量名解析为输入数据中的列。当将表达式评估转移到 DuckDB 时,过程变得稍微复杂一些。DuckDB 拥有自己的独立表达式系统,由内置函数集(例如 min
)、标量值和类型组成。为了将 R 表达式转换为 DuckDB 表达式,我们使用 R 中一个有趣的特性来从函数参数中捕获未求值的抽象语法树。通过遍历该树,我们可以将 R 标量值转换为 DuckDB 标量值,将 R 函数调用转换为 DuckDB 函数调用,并将 R 级别变量引用转换为 DuckDB 列引用。显然,这种转换不可能完美:R 中存在 DuckDB 根本不支持的函数,例如来自无数贡献软件包的函数。虽然我们正在努力扩展支持的表达式集,但总会有一些无法翻译。然而,对于无法翻译的表达式,我们仍然能够向用户返回结果。为了实现这一点,我们实现了一个透明的回退机制,当表达式无法转换为 DuckDB 的表达式语言时,该机制会使用现有的 R 级别表达式评估方法。例如,以下转换 m = n + 1
可以被翻译:
as_duckplyr_df(data.frame(n=1:10)) |>
mutate(m=n+1) |>
explain()
┌───────────────────────────┐
│ PROJECTION │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ n │
│ m │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ R_DATAFRAME_SCAN │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ data.frame │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ n │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 10 │
└───────────────────────────┘
而使用内联 lambda 函数的以下转换尚不能(目前)
as_duckplyr_df(data.frame(n=1:10)) |>
mutate(m=(\(x) x+1)(n)) |>
explain()
┌───────────────────────────┐
│ R_DATAFRAME_SCAN │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ data.frame │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ n │
│ m │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 10 │
└───────────────────────────┘
虽然有点难以看出(我们正在努力改进这一点),但 explain()
的输出在两个 mutate 表达式之间明显不同。在第一种情况下,DuckDB 将 + 1
作为投影运算符的一部分进行计算;在第二种情况下,转换失败并使用了回退,导致计算在 R 引擎中进行。自动回退的好处是“一切正常运行”。缺点是回退通常会导致性能下降,例如由于缺乏自动并行化。我们计划添加一个调试模式,用户可以在其中检查翻译过程并了解翻译失败的原因。
及时物化与惰性物化
Dplyr 和 Pandas 遵循一种被称为“及时物化”的执行策略。每当在数据帧上调用一个操作时,该操作会立即执行并在内存中创建结果。这可能存在问题。考虑以下示例:一个千万行的数据集通过向一列添加 1 进行修改。然后,调用 top_n
操作只检索前十行。由于及时物化,加法操作会在千万行上执行,结果会在内存中创建,但几乎所有结果都会立即被丢弃,因为只请求了前十行。Duckplyr 通过使用一种所谓的“惰性物化”策略来解决这个问题,即最初不执行任何操作,而是捕获用户的意图。这意味着向千万行添加 1 的操作不会立即执行。系统能够优化所请求的计算,并且只对前几行执行加法。同样重要的是,加法运算的中间结果从未实际在内存中创建,大大减轻了内存压力。
然而,惰性计算带来了一个可能的集成问题:惰性计算的结果必须是某种惰性计算占位符对象,该对象可以传递给另一个惰性操作或被强制求值,例如通过特殊的打印方法。然而,这将破坏与 dplyr 的向后兼容性,因为在 dplyr 中,每个 dplyr 操作的结果本身都是一个完全物化的数据帧。这意味着这些结果可以直接传递给下游操作,如绘图,而无需绘图包知道 duckplyr 结果对象的“惰性”。为了解决这个问题,我们创造性地使用了 R 中一个被称为 ALTREP 的特性。ALTREP 允许 R 对象具有不同的内存表示,并且每当访问这些对象时都可以执行自定义代码。Duckplyr 的结果是惰性占位符对象,没错,但它们同时看起来是标准 R 数据帧。R 数据帧本质上是带特殊 row.names 属性的命名类型向量列表。由于 DuckDB 的惰性查询计划已经知道结果表的名称和类型,我们可以将名称导出到惰性数据帧中。然而,我们尚不知道行数及其内容。因此,我们将实际数据向量和包含数据帧长度的行名向量都设为惰性向量。这些向量携带一个回调函数,每当下游代码(例如绘图代码)触及这些向量时,R 引擎将调用该回调。该回调将实际触发整个管道的计算并将结果转换为 R 数据帧。Duckplyr 自己的操作将避免触及这些向量,而是继续惰性地使用存储在惰性数据帧中的特殊惰性计算对象。这种方法使得 duckplyr 能够同时具有惰性而非惰性,从而允许完全无缝替代及时求值的 dplyr,同时保持惰性求值,这对于 DuckDB 能够对各种转换步骤进行完整查询优化至关重要。
以下是使用 R 的 inspect()
方法演示 duckplyr 操作结果两面性的示例
dd <- as_duckplyr_df(data.frame(n=1:10)) |> mutate(m=n+1)
.Internal(inspect(dd))
@12daad988 19 VECSXP g0c2 [OBJ,REF(2),ATT] (len=2, tl=0)
@13e0c9d60 13 INTSXP g0c0 [REF(4)] DUCKDB_ALTREP_REL_VECTOR n (INTEGER)
@13e0ca1c0 14 REALSXP g0c0 [REF(4)] DUCKDB_ALTREP_REL_VECTOR m (DOUBLE)
ATTRIB:
@12817a838 02 LISTSXP g0c0 [REF(1)]
TAG: @13d80d420 01 SYMSXP g1c0 [MARK,REF(65535),LCK,gp=0x4000] "names" (has value)
@12daada08 16 STRSXP g0c2 [REF(65535)] (len=2, tl=0)
@13d852ef0 09 CHARSXP g1c1 [MARK,REF(553),gp=0x61] [ASCII] [cached] "n"
@13e086338 09 CHARSXP g1c1 [MARK,REF(150),gp=0x61] [ASCII] [cached] "m"
TAG: @13d80d9d0 01 SYMSXP g1c0 [MARK,REF(56009),LCK,gp=0x4000] "class" (has value)
@12da9e208 16 STRSXP g0c2 [REF(65535)] (len=2, tl=0)
@11ff15708 09 CHARSXP g0c2 [MARK,REF(423),gp=0x60] [ASCII] [cached] "duckplyr_df"
@13d892308 09 CHARSXP g1c2 [MARK,REF(1513),gp=0x61,ATT] [ASCII] [cached] "data.frame"
TAG: @13d80d1f0 01 SYMSXP g1c0 [MARK,REF(65535),LCK,gp=0x4000] "row.names" (has value)
@13e0c9970 13 INTSXP g0c0 [REF(65535)] DUCKDB_ALTREP_REL_ROWNAMES
我们可以看到数据帧的内部结构确实反映了一个数据帧,但我们也可以看到特殊的向量 DUCKDB_ALTREP_REL_VECTOR
隐藏了未求值的数据向量,以及 DUCKDB_ALTREP_REL_ROWNAMES
隐藏了数据帧的真实维度尚不明确的事实。
基准测试:TPC-H Q1
我们以快速演示 duckplyr 的性能改进来结束。我们使用来自知名 TPC-H 基准测试的数据生成器,该生成器已作为 DuckDB 扩展提供,非常有用。将“比例因子”设置为 1,以下 DuckDB/R 一行代码将生成一个包含略超过 600 万行的数据集,并将其存储在名为“lineitem”的 R 数据帧中
lineitem <- duckdb:::sql("INSTALL tpch; LOAD tpch; CALL dbgen(sf=1); FROM lineitem;")
我们已将 TPC-H 基准测试查询 1 从其原始 SQL 形式转换为 dplyr 语法
tpch_01 <- function() {
lineitem |>
select(l_shipdate, l_returnflag, l_linestatus, l_quantity, l_extendedprice, l_discount, l_tax) |>
filter(l_shipdate <= !!as.Date("1998-09-02")) |>
select(l_returnflag, l_linestatus, l_quantity, l_extendedprice, l_discount, l_tax) |>
summarise(
sum_qty = sum(l_quantity),
sum_base_price = sum(l_extendedprice),
sum_disc_price = sum(l_extendedprice * (1 - l_discount)),
sum_charge = sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)),
avg_qty = mean(l_quantity),
avg_price = mean(l_extendedprice),
avg_disc = mean(l_discount),
count_order = n(),
.by = c(l_returnflag, l_linestatus)
) |>
arrange(l_returnflag, l_linestatus)
}
我们现在可以使用 dplyr 和 duckplyr 执行此函数,并观察计算结果所需的时间。“原生”dplyr 在我的 MacBook 上执行此查询大约需要 400 毫秒,而 duckplyr 仅需约 70 毫秒。同样,这个时间包括了将 dplyr 动词序列转换为关系运算符树、优化该树、将输入 R 数据帧即时转换为 DuckDB 中间数据,以及将(诚然较小的)结果转换回 R 数据帧的所有“魔法”。当然,这里使用的数据集仍然相对较小,查询也不是很复杂,本质上是一个简单的分组聚合。对于更大数据集上更复杂的转换,差异将更加明显。duckplyr 还可以直接访问存储上的大量 Parquet 文件等,并将过滤器下推到这些扫描中,这也可以大幅提高性能。
结论
适用于 R 的 duckplyr 软件包将 DuckDB 最先进的分析查询处理技术封装在与 dplyr 兼容的 API 中。尽管我们将执行范式从及时切换到惰性,并且必须将表达式翻译到不同的环境中,但我们仍不遗余力地确保兼容性。我们将继续努力扩展 duckplyr 的功能,并期待听到您的使用体验。
以下是去年 posit::conf 上的两段录音,我们在其中介绍了用于 R 的 DuckDB 和 duckplyr