DuckDB的AsOf连接:模糊时间查找

Author Avatar
Richard Wesley
2023-09-15 · 20分钟阅读

总结:DuckDB支持AsOf连接——一种匹配附近值的方式。它们在时间序列分析中,用于搜索事件表时特别有用。

你是否有时间序列数据需要连接,但时间戳却无法完全匹配?或者你想通过另一张表中的时间来查找随时间变化的值?你是否最终编写了复杂(且缓慢)的不等式连接来获取结果?那么,这篇文章就是为你准备的!

什么是AsOf连接?

时间序列数据并非总是完美对齐的。时钟可能略有偏差,或者因果之间可能存在延迟。这使得连接两组有序数据变得具有挑战性。AsOf连接是解决此问题及其他类似问题的工具。

AsOf连接解决的问题之一是查找特定时间点上某个变化属性的值。这种用例非常常见,以至于AsOf这个名称由此而来:给我这个属性截至此时间的值。

然而,更普遍地说,AsOf连接体现了一些常见的时间分析语义,这些语义在标准SQL中实现起来可能繁琐且缓慢。

投资组合示例

让我们从一个具体的例子开始。假设我们有一个包含时间戳的股票价格 (prices)

股票代码 时间 价格
APPL 2001-01-01 00:00:00 1
APPL 2001-01-01 00:01:00 2
APPL 2001-01-01 00:02:00 3
MSFT 2001-01-01 00:00:00 1
MSFT 2001-01-01 00:01:00 2
MSFT 2001-01-01 00:02:00 3
GOOG 2001-01-01 00:00:00 1
GOOG 2001-01-01 00:01:00 2
GOOG 2001-01-01 00:02:00 3

我们还有另一个表,其中包含投资组合在不同时间点的持仓 (holdings)

股票代码 时间 股数
APPL 2000-12-31 23:59:30 5.16
APPL 2001-01-01 00:00:30 2.94
APPL 2001-01-01 00:01:30 24.13
GOOG 2000-12-31 23:59:30 9.33
GOOG 2001-01-01 00:00:30 23.45
GOOG 2001-01-01 00:01:30 10.58
数据 2000-12-31 23:59:30 6.65
数据 2001-01-01 00:00:30 17.95
数据 2001-01-01 00:01:30 18.37

我们可以通过使用AsOf连接,找到持仓时间戳之前最近的价格,从而计算出该时间点上每项持仓的价值

SELECT h.ticker, h.when, price * shares AS value
FROM holdings h ASOF JOIN prices p
  ON h.ticker = p.ticker
 AND h.when >= p.when;

这会将该时间点上持仓的价值附加到每一行

股票代码 时间 价值
APPL 2001-01-01 00:00:30 2.94
APPL 2001-01-01 00:01:30 48.26
GOOG 2001-01-01 00:00:30 23.45
GOOG 2001-01-01 00:01:30 21.16

它实质上是执行一个函数,该函数通过查找prices表中的附近值来定义。另请注意,缺失的ticker值没有匹配项,也不会出现在输出中。

外部AsOf连接

由于AsOf在右侧最多产生一个匹配,因此左侧表不会因连接而增长,但如果右侧缺少时间,它可能会缩小。为了处理这种情况,你可以使用一个外部AsOf连接

SELECT h.ticker, h.when, price * shares AS value
FROM holdings h ASOF LEFT JOIN prices p
  ON h.ticker = p.ticker
 AND h.when >= p.when
ORDER BY ALL;

正如你可能预期的那样,当没有股票代码或者时间早于价格开始时间时,这会产生NULL价格和值,而不是删除左侧的行。

股票代码 时间 价值
APPL 2000-12-31 23:59:30  
APPL 2001-01-01 00:00:30 2.94
APPL 2001-01-01 00:01:30 48.26
GOOG 2000-12-31 23:59:30  
GOOG 2001-01-01 00:00:30 23.45
GOOG 2001-01-01 00:01:30 21.16
数据 2000-12-31 23:59:30  
数据 2001-01-01 00:00:30  
数据 2001-01-01 00:01:30  

窗口函数替代方案

标准SQL可以实现这种连接,但你需要使用窗口函数和不等式连接。这些都可能是相当昂贵的操作,但查询会是这样的

WITH state AS (
    SELECT
        ticker,
        price,
        "when",
        lead("when", 1, 'infinity')
            OVER (PARTITION BY ticker ORDER BY "when") AS end
    FROM prices
)
SELECT h.ticker, h.when, price * shares AS value
FROM holdings h
INNER JOIN state s
        ON h.ticker = s.ticker
      AND h.when >= s.when
      AND h.when < s.end;

infinity的默认值用于确保最后一行有一个可供比较的结束值。这是我们示例中state CTE的样子:

股票代码 价格 时间 结束
APPL 1 2001-01-01 00:00:00 2001-01-01 00:01:00
APPL 2 2001-01-01 00:01:00 2001-01-01 00:02:00
APPL 3 2001-01-01 00:02:00 无限
GOOG 1 2001-01-01 00:00:00 2001-01-01 00:01:00
GOOG 2 2001-01-01 00:01:00 2001-01-01 00:02:00
GOOG 3 2001-01-01 00:02:00 无限
MSFT 1 2001-01-01 00:00:00 2001-01-01 00:01:00
MSFT 2 2001-01-01 00:01:00 2001-01-01 00:02:00
MSFT 3 2001-01-01 00:02:00 无限

在没有等式条件的情况下,规划器将不得不使用不等式连接,这可能会非常昂贵。即使在等式条件的情况下,生成的哈希连接也可能出现长链的相同ticker键,这些键都将匹配并需要修剪。

为什么选择AsOf?

如果SQL已经可以计算AsOf连接,为什么我们还需要一种新的连接类型?主要有两个原因:表达能力和性能。窗口函数替代方案比AsOf语法更冗长且难以理解,因此,使其更容易表达你的意图有助于他人(甚至是你自己!)理解正在发生的事情。

这种语法也使得DuckDB更容易理解你的意图并更快地生成结果。窗口函数和不等式连接版本失去了区间不重叠这一有价值的信息。它还阻止了查询优化器移动连接,因为SQL坚持窗口操作发生在连接之后。通过将操作视为具有已知数据约束连接,DuckDB可以为了性能而移动连接,并使用定制的连接算法。我们使用的算法是对右侧表进行排序,然后与左侧值进行一种合并连接。但与标准合并连接不同,AsOf在找到第一个匹配项时就可以停止搜索,因为它最多只有一个匹配项。

状态表

你可能想知道为什么WITH子句中的公共表表达式被称为state(状态)。这是因为prices表实际上是时间分析中所谓的事件表的一个例子。事件表的行包含时间戳和该时间发生的事情(即事件)。prices表中的事件是股票价格的变化。事件表的另一个常见例子是结构化日志文件:日志的每一行都记录了某事“发生”的时间——通常是系统某个部分的更改。

事件表很难处理,因为每个事实只包含开始时间。为了知道该事实是否仍然为真(或在特定时间为真),你还需要结束时间。包含开始时间和结束时间的表被称为状态表。将事件表转换为状态表是常见的时间数据准备任务,上面的窗口化CTE展示了如何使用SQL普遍地做到这一点。

哨兵值

窗口方法的一个限制是,如果排序类型不支持infinity,则需要有一个可用的哨兵值,它可以是未使用的值或NULL

这两种选择都可能存在问题。在第一种情况下,可能不容易确定一个上限哨兵值(假设排序是一个字符串列?)。在第二种情况下,你需要将条件写成h.when < s.end OR s.end IS NULL,并且在连接条件中使用这样的OR会使比较变慢且难以优化。此外,如果排序列已经使用NULL来指示缺失值,则此选项不可用。

对于大多数状态表,都有合适的选择(例如,大日期值),但AsOf的优点之一是,如果分析任务不需要状态表,它可以避免设计状态表。

事件表变体

到目前为止,我们一直在使用一种标准类型的事件表,其中时间戳被假定为状态转换的开始。但现在AsOf可以使用任何不等式,这使得它能够处理其他类型的事件表。

为了探讨这一点,我们使用两个非常简单的、没有等式条件的表。构建侧将只有四个整数“时间戳”以及字母值:

时间
1 a
2 b
3 c
4 d

探测表将只有时间值加上中点,我们可以创建一个表来显示每个探测时间在“大于或等于”条件下的匹配值:

探测 >=
0.5  
1.0 a
1.5 a
2.0 b
2.5 b
3.0 c
3.5 c
4.0 d
4.5 d

这表明探测值匹配的区间是半开区间[Tn, Tn+1)

现在让我们看看如果使用“严格大于”作为不等式会发生什么:

探测 >
0.5  
1.0  
1.5 a
2.0 a
2.5 b
3.0 b
3.5 c
4.0 c
4.5 d

现在我们可以看到,探测值匹配的区间是半开区间(Tn, Tn+1]。唯一的区别是区间在末尾是闭合的,而不是在开头。这意味着对于这种不等式类型,时间不属于该区间。

如果不等式方向相反,比如“小于或等于”呢?

探测 <=
0.5 a
1.0 a
1.5 b
2.0 b
2.5 c
3.0 c
3.5 d
4.0 d
4.5  

同样,我们有半开区间,但这次我们匹配的是前一个区间(Tn-1, Tn]。一种解释是构建表中的时间是区间的结束,而不是开始。此外,与“大于或等于”不同,该区间在末尾是闭合的,而不是在开头。将此与我们对“严格大于”的发现结合起来,我们可以将此解释为:当使用非严格不等式时,查找时间是区间的一部分。

我们可以通过查看最后一个不等式来验证这一点:“严格小于”

探测 <
0.5 a
1.0 b
1.5 b
2.0 c
2.5 c
3.0 d
3.5 d
4.0  
4.5  

在这种情况下,匹配区间是[Tn-1, Tn)。这是一个严格不等式,因此表时间不在此区间内;它是一个小于关系,因此时间是区间的结束。

综上所述,完整列表如下:

不等式 间隔
> (Tn, Tn+1]
>= [Tn, Tn+1)
<= (Tn-1, Tn]
< [Tn-1, Tn)

现在我们对不等式的含义有两种自然的解释:

  • 大于(或小于)不等式表示时间是区间的开始(或结束)。
  • 严格(或非严格)不等式表示时间被排除在(或包含在)区间内。

因此,如果我们知道时间是事件的开始还是结束,以及时间是否包含在内或排除在外,我们就可以选择合适的AsOf不等式。

用法

到目前为止,我们一直在明确指定AsOf的条件,但SQL也为两表中列名相同这种常见情况提供了一种简化的连接条件语法。这种语法使用USING关键字来列出应进行等值比较的字段。AsOf也支持这种语法,但有两个限制:

  • 最后一个字段是不等式
  • 不等式是>=(最常见的情况)

我们的第一个查询可以写成:

SELECT ticker, h.when, price * shares AS value
FROM holdings h
ASOF JOIN prices p
    USING(ticker, "when");

请注意,如果你没有在SELECT中明确列出列,排序字段的值将是探测值,而不是构建值。对于自然连接,这不成问题,因为所有条件都是等式;但对于AsOf,必须选择一侧。由于AsOf可以被视为一个查找函数,因此返回“函数参数”比返回函数内部更自然。

幕后原理

AsOf连接的真正作用是允许你将事件表视为状态表进行连接操作。通过了解连接的语义,它可以避免创建完整的状态表,并且比一般的不等式连接更高效。

让我们从了解窗口化版本如何工作开始。请记住,我们使用此查询将事件表转换为状态表:

WITH state AS (
    SELECT
        ticker,
        price,
        "when",
        lead("when", 1, 'infinity')
            OVER (PARTITION BY ticker ORDER BY "when") AS end
    FROM prices
);

状态表CTE是通过在ticker上对表进行哈希分区、在when上排序,然后计算另一个只是when向下偏移一位的列来创建的。然后,连接通过在ticker上进行哈希连接以及在when上进行两次比较来实现。

如果没有ticker列(例如,价格仅针对单个项目),那么连接将使用我们的不等式连接运算符实现,该运算符将实例化并排序两侧,因为它不知道这些范围是不相交的。

AsOf运算符使用所有三个运算符管道API来合并和收集行。在sink(下沉)阶段,AsOf对右侧进行哈希分区和排序,以生成一个临时状态表。(实际上,它使用了与Window相同的代码,但没有不必要地实例化结束列。)在operator(操作)阶段,它会过滤掉(或返回)由于谓词表达式中存在NULL值而无法匹配的行,然后将剩余的行哈希分区并排序到缓存中。最后,在source(源)阶段,它匹配哈希分区,然后对每个哈希分区内的排序值进行合并连接。

基准测试

由于AsOf连接可以使用标准SQL查询以各种方式实现,因此基准测试实际上是关于比较各种替代方案。

一种替代方案是为AsOf提供一个调试PRAGMA,名为debug_asof_iejoin,它使用Window和IEJoin来实现连接。这使我们能够轻松地在不同实现之间切换并比较运行时。

其他替代方案结合了等值连接和窗口函数。等值连接用于实现相等匹配条件,而窗口则用于选择最近的不等式。我们现在将研究两种不同的窗口化技术并比较它们的性能。如果你想跳过本节,最重要的是,虽然它们有时会稍微快一些,但AsOf连接在所有算法中具有最一致的行为。

将窗口作为状态表

第一个基准测试将哈希连接与状态表进行比较。它使用自连接探测一个500万行(5M row)的值表,该表由10万个时间戳(100K timestamps)和50个分区键(50 partitioning keys)构建,其中只有50%的键存在,并且时间戳已偏移到原始时间戳的中间

CREATE OR REPLACE TABLE build AS (
    SELECT k, '2001-01-01 00:00:00'::TIMESTAMP + INTERVAL (v) MINUTE AS t, v
    FROM range(0, 100_000) vals(v), range(0, 50) keys(k)
);

CREATE OR REPLACE TABLE probe AS (
    SELECT k * 2 AS k, t - INTERVAL (30) SECOND AS t
    FROM build
);

build表如下所示:

k t v
0 2001-01-01 00:00:00 0
0 2001-01-01 00:01:00 1
0 2001-01-01 00:02:00 2
0 2001-01-01 00:03:00 3

探测表如下所示(k只包含偶数):

k t
0 2000-12-31 23:59:30
0 2001-01-01 00:00:30
0 2001-01-01 00:01:30
0 2001-01-01 00:02:30
0 2001-01-01 00:03:30

基准测试只执行连接并对v列求和。

SELECT sum(v)
FROM probe
ASOF JOIN build USING(k, t);

调试PRAGMA不允许我们使用哈希连接,但我们可以再次在CTE中创建状态表并使用内连接

-- Hash Join implementation
WITH state AS (
    SELECT k, 
      t AS begin, 
      v, 
      lead(t, 1, 'infinity'::TIMESTAMP) OVER (PARTITION BY k ORDER BY t) AS end
    FROM build
)
SELECT sum(v)
FROM probe p
INNER JOIN state s 
        ON p.t >= s.begin
       AND p.t < s.end
       AND p.k = s.k;

这之所以有效,是因为规划器假定等式条件比不等式更具选择性,并生成带有过滤器的哈希连接。

运行基准测试,我们得到如下结果:

算法 5次运行的中位数
AsOf 0.425 秒
IEJoin 3.522 秒
状态连接 192.460 秒

AsOf相对于IEJoin在此处的运行时改进约为9倍。哈希连接糟糕的性能是由于哈希表中存在长(10万条)的桶链造成的。

第二个基准测试了探测侧比构建侧小约10倍的情况:

CREATE OR REPLACE TABLE probe AS
    SELECT k, 
      '2021-01-01T00:00:00'::TIMESTAMP +
          INTERVAL (random() * 60 * 60 * 24 * 365) SECOND AS t,
    FROM range(0, 100_000) tbl(k);

CREATE OR REPLACE TABLE build AS
    SELECT r % 100_000 AS k, 
      '2021-01-01T00:00:00'::TIMESTAMP +
          INTERVAL (random() * 60 * 60 * 24 * 365) SECOND AS t,
      (random() * 100_000)::INTEGER AS v
    FROM range(0, 1_000_000) tbl(r);

SELECT sum(v)
FROM probe p
ASOF JOIN build b
  ON p.k = b.k
 AND p.t >= b.t

-- Hash Join Version
WITH state AS (
  SELECT k, 
    t AS begin, 
    v, 
    lead(t, 1, 'infinity'::TIMESTAMP)
        OVER (PARTITION BY k ORDER BY t) AS end
  FROM build
)
SELECT sum(v)
FROM probe p
INNER JOIN state s
        ON p.t >= s.begin
       AND p.t < s.end
       AND p.k = s.k;
算法 5次运行的中位数
状态连接 0.065 秒
AsOf 0.077 秒
IEJoin 49.508 秒

现在,AsOf相对于IEJoin的运行时改进巨大(约500倍),因为它能够利用分区来消除几乎所有的等值不匹配。

哈希连接的实现在这里表现得好得多,因为优化器注意到探测侧更小,并在“探测”表上构建哈希表。此外,这里的探测值是唯一的,因此哈希表链是最小的。

带有排名的窗口

使用窗口运算符的另一种方式是:

  • 根据等值谓词连接表
  • 过滤构建时间在探测时间之前的对
  • 在等值键探测时间戳上对结果进行分区
  • 按构建时间戳降序对分区进行排序
  • 过滤掉除排名为1的所有值(即,最大的构建时间 <= 探测时间)

查询如下所示:

WITH win AS (
    SELECT p.k, p.t, v,
        rank() OVER (PARTITION BY p.k, p.t ORDER BY b.t DESC) AS r
    FROM probe p INNER JOIN build b
      ON p.k = b.k
    AND p.t >= b.t
    QUALIFY r = 1
) 
SELECT k, t, v
FROM win;

这种窗口化查询的优点是它不需要哨兵值,因此可以适用于任何数据类型。缺点是它会创建更多的分区,因为它包含了两个时间戳,这需要更复杂的排序。此外,因为它在连接之后应用窗口,它会产生巨大的中间结果,可能导致外部排序和昂贵的内存不足操作。

对于此基准测试,我们将使用三个构建表和两个探测表,所有表都包含1万个整数等值键。探测表每个键有1个或15个时间戳。

CREATE OR REPLACE TABLE probe15 AS
    SELECT k, t
    FROM range(10_000) cs(k), 
         range('2022-01-01'::TIMESTAMP, '2023-01-01'::TIMESTAMP, INTERVAL 26 DAY) ts(t);

CREATE OR REPLACE TABLE probe1 AS
    SELECT k, '2022-01-01'::TIMESTAMP t
    FROM range(10_000) cs(k);

构建表大得多,其条目数量大约是15元素表的10/100/1000倍。

-- 10:1
CREATE OR REPLACE TABLE build10 AS
    SELECT k, t, (random() * 1000)::DECIMAL(7, 2) AS v
    FROM range(10_000) ks(k), 
         range('2022-01-01'::TIMESTAMP, '2023-01-01'::TIMESTAMP, INTERVAL 59 HOUR) ts(t);

-- 100:1
CREATE OR REPLACE TABLE build100 AS
    SELECT k, t, (random() * 1000)::DECIMAL(7, 2) AS v
    FROM range(10_000) ks(k), 
         range('2022-01-01'::TIMESTAMP, '2023-01-01'::TIMESTAMP, INTERVAL 350 MINUTE) ts(t);

-- 1000:1
CREATE OR REPLACE TABLE build1000 AS
    SELECT k, t, (random() * 1000)::DECIMAL(7, 2) AS v
    FROM range(10_000) ks(k), 
         range('2022-01-01'::TIMESTAMP, '2023-01-01'::TIMESTAMP, INTERVAL 35 MINUTE) ts(t);

AsOf连接查询如下:

-- AsOf/IEJoin
SELECT p.k, p.t, v
FROM probe p ASOF JOIN build b
  ON p.k = b.k
 AND p.t >= b.t
ORDER BY 1, 2;

-- Rank
WITH win AS (
    SELECT p.k, p.t, v,
        rank() OVER (PARTITION BY p.k, p.t ORDER BY b.t DESC)  AS r
    FROM probe p INNER JOIN build b
      ON p.k = b.k
    AND p.t >= b.t
    QUALIFY r = 1
)
SELECT k, t, v
FROM win
ORDER BY 1, 2;

结果如下所示:

Rank Benchmark Results

(5次运行的中位数,Rank/15/1000除外)。

  • 对于所有15个探测的比率,AsOf的性能最佳。
  • 对于15个探测的小比率,Rank优于IEJoin(两者都使用窗口函数),但到100:1时,它开始“爆炸性”增长。
  • 对于单元素探测,Rank最有效,但即使在这种情况下,它在规模上相对于AsOf的优势也只有大约50%。

这表明AsOf可能仍有改进空间,但预测改进发生的位置将很棘手,而且如果判断错误,将付出巨大代价。

未来工作

DuckDB现在可以以合理的性能执行所有不等式类型的AsOf连接。在某些情况下,与标准SQL版本相比,性能提升达到数个数量级——即使我们已经有了快速的不等式连接运算符。

尽管当前的AsOf运算符是完全通用的,但这里仍可以应用一些规划优化。

  • 当存在选择性等值条件时,使用哈希连接并对具象化状态表进行过滤可能会显著更快。如果我们能够检测到这一点并有合适的哨兵值可用,规划器可以选择使用哈希连接而不是默认的AsOf实现。
  • 还有一些用例,其中探测表远小于构建表,并且存在等值条件,此时对探测表执行哈希连接可以带来显著的性能提升。

尽管如此,请记住SQL的优点之一是它是一种声明性语言:
你指定你想要什么,而由数据库来决定如何实现。既然我们已经定义了AsOf连接的语义,用户就可以编写查询来表达你想要什么——而我们可以自由地不断改进如何实现!

连接愉快!

在DuckDB上工作最有趣的部分之一是它扩展了传统的SQL无序数据模型。DuckDB使得查询有序数据集(如数据帧和parquet文件)变得容易,当你拥有这样的数据时,你期望能够进行有序分析!实现快速排序、快速窗口化和快速AsOf连接正是我们实现这一期望的方式。