⌘+k ctrl+k
1.3 (稳定版)
搜索快捷键 cmd + k | ctrl + k
Julia 客户端

DuckDB Julia 包为 DuckDB 提供了高性能前端。与 SQLite 类似,DuckDB 在 Julia 客户端内部以进程内方式运行,并提供 DBInterface 前端。

该包还支持多线程执行。它为此目的使用了 Julia 的线程/任务。如果您希望并行运行查询,必须在支持多线程的情况下启动 Julia(例如,通过设置 JULIA_NUM_THREADS 环境变量)。

安装

按以下方式安装 DuckDB

using Pkg
Pkg.add("DuckDB")

或者,使用 ] 键进入包管理器,并执行以下命令

pkg> add DuckDB

基础知识

using DuckDB

# create a new in-memory database
con = DBInterface.connect(DuckDB.DB, ":memory:")

# create a table
DBInterface.execute(con, "CREATE TABLE integers (i INTEGER)")

# insert data by executing a prepared statement
stmt = DBInterface.prepare(con, "INSERT INTO integers VALUES(?)")
DBInterface.execute(stmt, [42])

# query the database
results = DBInterface.execute(con, "SELECT 42 a")
print(results)

一些 SQL 语句,例如 PIVOT 和 IMPORT DATABASE,作为多个预处理语句执行,在使用 DuckDB.execute() 时会出错。相反,它们可以使用 DuckDB.query() 而不是 DuckDB.execute() 运行,并且总是返回一个物化结果。

扫描 DataFrames

DuckDB Julia 包还支持查询 Julia DataFrames。请注意,DataFrames 是由 DuckDB 直接读取的——它们不会被插入或复制到数据库本身。

如果您希望将 DataFrame 中的数据加载到 DuckDB 表中,可以运行 CREATE TABLE ... ASINSERT INTO 查询。

using DuckDB
using DataFrames

# create a new in-memory dabase
con = DBInterface.connect(DuckDB.DB)

# create a DataFrame
df = DataFrame(a = [1, 2, 3], b = [42, 84, 42])

# register it as a view in the database
DuckDB.register_data_frame(con, df, "my_df")

# run a SQL query over the DataFrame
results = DBInterface.execute(con, "SELECT * FROM my_df")
print(results)

Appender API

DuckDB Julia 包还支持 Appender API,这比使用预处理语句或单独的 INSERT INTO 语句快得多。追加操作以按行格式进行。对于每一列,都应该进行一次 append() 调用,之后通过调用 flush() 来完成该行。所有行追加完成后,应使用 close() 来完成 Appender 并清理产生的内存。

using DuckDB, DataFrames, Dates
db = DuckDB.DB()
# create a table
DBInterface.execute(db,
    "CREATE OR REPLACE TABLE data (id INTEGER PRIMARY KEY, value FLOAT, timestamp TIMESTAMP, date DATE)")
# create data to insert
len = 100
df = DataFrames.DataFrame(
        id = collect(1:len),
        value = rand(len),
        timestamp = Dates.now() + Dates.Second.(1:len),
        date = Dates.today() + Dates.Day.(1:len)
    )
# append data by row
appender = DuckDB.Appender(db, "data")
for i in eachrow(df)
    for j in i
        DuckDB.append(appender, j)
    end
    DuckDB.end_row(appender)
end
# close the appender after all rows
DuckDB.close(appender)

并发

在 Julia 进程中,只要每个任务都保持自己的数据库连接,任务就能够并发地读写数据库。在下面的示例中,一个任务被生成用于定期读取数据库,而许多任务被生成用于使用 INSERT 语句Appender API 写入数据库。

using Dates, DataFrames, DuckDB
db = DuckDB.DB()
DBInterface.connect(db)
DBInterface.execute(db, "CREATE OR REPLACE TABLE data (date TIMESTAMP, id INTEGER)")

function run_reader(db)
    # create a DuckDB connection specifically for this task
    conn = DBInterface.connect(db)
    while true
        println(DBInterface.execute(conn,
                "SELECT id, count(date) AS count, max(date) AS max_date
                FROM data GROUP BY id ORDER BY id") |> DataFrames.DataFrame)
        Threads.sleep(1)
    end
    DBInterface.close(conn)
end
# spawn one reader task
Threads.@spawn run_reader(db)

function run_inserter(db, id)
    # create a DuckDB connection specifically for this task
    conn = DBInterface.connect(db)
    for i in 1:1000
        Threads.sleep(0.01)
        DuckDB.execute(conn, "INSERT INTO data VALUES (current_timestamp, ?)"; id);
    end
    DBInterface.close(conn)
end
# spawn many insert tasks
for i in 1:100
    Threads.@spawn run_inserter(db, 1)
end

function run_appender(db, id)
    # create a DuckDB connection specifically for this task
    appender = DuckDB.Appender(db, "data")
    for i in 1:1000
        Threads.sleep(0.01)
        row = (Dates.now(Dates.UTC), id)
        for j in row
            DuckDB.append(appender, j);
        end
        DuckDB.end_row(appender);
    end
    DuckDB.close(appender);
end
# spawn many appender tasks
for i in 1:100
    Threads.@spawn run_appender(db, 2)
end

原始 Julia 连接器

鸣谢 kimmolinna 提供的 原始 DuckDB Julia 连接器