⌘+k ctrl+k
1.4 (LTS)
搜索快捷键 cmd + k | ctrl + k
多个 Python 线程

本页面演示了如何在多个 Python 线程中同时向 DuckDB 数据库插入数据并进行读取。这在数据持续流入且需要定期重新运行分析的场景中非常有用。请注意,这完全在单个 Python 进程内完成(有关 DuckDB 并发性的详细信息,请参阅 常见问题解答)。欢迎参考此 Google Colab 笔记本 进行练习。

设置

首先,导入 DuckDB 以及 Python 标准库中的几个模块。注意:如果使用 Pandas,请同时在脚本顶部添加 import pandas(因为它必须在多线程之前导入)。然后连接到一个基于文件的 DuckDB 数据库,并创建一个示例表来存储插入的数据。该表将记录完成插入操作的线程名称,并使用 DEFAULT 表达式自动插入数据发生时的时刻。

import duckdb
from threading import Thread, current_thread
import random

duckdb_con = duckdb.connect('my_peristent_db.duckdb')
# Use connect without parameters for an in-memory database
# duckdb_con = duckdb.connect()
duckdb_con.execute("""
    CREATE OR REPLACE TABLE my_inserts (
        thread_name VARCHAR,
        insert_time TIMESTAMP DEFAULT current_timestamp
    )
""")

读写函数

接下来,定义将由写入线程和读取线程执行的函数。每个线程都必须使用 .cursor() 方法,基于原始连接创建一个指向同一个 DuckDB 文件的线程本地连接。这种方法同样适用于内存中的 DuckDB 数据库。

def write_from_thread(duckdb_con):
    # Create a DuckDB connection specifically for this thread
    local_con = duckdb_con.cursor()
    # Insert a row with the name of the thread. insert_time is auto-generated.
    thread_name = str(current_thread().name)
    result = local_con.execute("""
        INSERT INTO my_inserts (thread_name)
        VALUES (?)
    """, (thread_name,)).fetchall()

def read_from_thread(duckdb_con):
    # Create a DuckDB connection specifically for this thread
    local_con = duckdb_con.cursor()
    # Query the current row count
    thread_name = str(current_thread().name)
    results = local_con.execute("""
        SELECT
            ? AS thread_name,
            count(*) AS row_counter,
            current_timestamp
        FROM my_inserts
    """, (thread_name,)).fetchall()
    print(results)

创建线程

我们定义要使用的写入线程和读取线程的数量,并定义一个列表来跟踪将要创建的所有线程。然后,先创建写入线程,再创建读取线程。接着,将它们打乱顺序,以便它们以随机顺序启动,从而模拟并发的写入和读取。请注意,此时线程尚未执行,仅完成了定义。

write_thread_count = 50
read_thread_count = 5
threads = []

# Create multiple writer and reader threads (in the same process)
# Pass in the same connection as an argument
for i in range(write_thread_count):
    threads.append(Thread(target = write_from_thread,
                            args = (duckdb_con,),
                            name = 'write_thread_' + str(i)))

for j in range(read_thread_count):
    threads.append(Thread(target = read_from_thread,
                            args = (duckdb_con,),
                            name = 'read_thread_' + str(j)))

# Shuffle the threads to simulate a mix of readers and writers
random.seed(6) # Set the seed to ensure consistent results when testing
random.shuffle(threads)

运行线程并展示结果

现在,启动所有线程并行运行,并等待它们全部完成后再打印出结果。请注意,由于进行了随机化处理,读取器和写入器的时间戳正如预期的那样交错出现。

# Kick off all threads in parallel
for thread in threads:
    thread.start()

# Ensure all threads complete before printing final results
for thread in threads:
    thread.join()

print(duckdb_con.execute("""
    SELECT *
    FROM my_inserts
    ORDER BY
        insert_time
""").df())
© 2025 DuckDB 基金会,阿姆斯特丹,荷兰
行为准则 商标使用指南