搜索快捷键 cmd + k | ctrl + k
- 安装
- 文档
- 入门
- 连接
- 数据导入与导出
- 湖仓格式
- 客户端 API
- 概览
- 第三方客户端
- ADBC
- C
- C++
- CLI
- Dart
- Go
- Java (JDBC)
- Julia
- Node.js (已弃用)
- Node.js (Neo)
- ODBC
- PHP
- Python
- R
- Rust
- Swift
- Wasm
- SQL
- 介绍
- 语句
- 概览
- ANALYZE
- ALTER TABLE
- ALTER VIEW
- ATTACH 和 DETACH
- CALL
- CHECKPOINT
- COMMENT ON
- COPY
- CREATE INDEX
- CREATE MACRO
- CREATE SCHEMA
- CREATE SECRET
- CREATE SEQUENCE
- CREATE TABLE
- CREATE VIEW
- CREATE TYPE
- DELETE
- DESCRIBE
- DROP
- EXPORT 和 IMPORT DATABASE
- INSERT
- LOAD / INSTALL
- MERGE INTO
- PIVOT
- 性能分析
- SELECT
- SET / RESET
- SET VARIABLE
- SHOW 与 SHOW DATABASES
- SUMMARIZE
- 事务管理
- UNPIVOT
- UPDATE
- USE
- VACUUM
- 查询语法
- SELECT
- FROM 和 JOIN
- WHERE
- GROUP BY
- GROUPING SETS
- HAVING
- ORDER BY
- LIMIT 和 OFFSET
- SAMPLE
- 展开嵌套
- WITH
- WINDOW
- QUALIFY
- VALUES
- FILTER
- 集合操作
- 预处理语句
- 数据类型
- 表达式
- 函数
- 概览
- 聚合函数
- 数组函数
- 位字符串函数
- Blob 函数
- 日期格式化函数
- 日期函数
- 日期部分函数
- 枚举函数
- 间隔函数
- Lambda 函数
- 列表函数
- 映射函数
- 嵌套函数
- 数值函数
- 模式匹配
- 正则表达式
- 结构体函数
- 文本函数
- 时间函数
- 时间戳函数
- 带时区时间戳函数
- 联合函数
- 实用函数
- 窗口函数
- 约束
- 索引
- 元查询
- DuckDB 的 SQL 方言
- 示例
- 配置
- 扩展
- 核心扩展
- 概览
- 自动补全
- Avro
- AWS
- Azure
- Delta
- DuckLake
- 编码
- Excel
- 全文搜索
- httpfs (HTTP 和 S3)
- Iceberg
- ICU
- inet
- jemalloc
- Lance
- MySQL
- PostgreSQL
- 空间
- SQLite
- TPC-DS
- TPC-H
- UI
- Unity Catalog
- Vortex
- VSS
- 指南
- 概览
- 数据查看器
- 数据库集成
- 文件格式
- 概览
- CSV 导入
- CSV 导出
- 直接读取文件
- Excel 导入
- Excel 导出
- JSON 导入
- JSON 导出
- Parquet 导入
- Parquet 导出
- 查询 Parquet 文件
- 使用 file: 协议访问文件
- 网络和云存储
- 概览
- HTTP Parquet 导入
- S3 Parquet 导入
- S3 Parquet 导出
- S3 Iceberg 导入
- S3 Express One
- GCS 导入
- Cloudflare R2 导入
- 通过 HTTPS / S3 使用 DuckDB
- Fastly 对象存储导入
- 元查询
- ODBC
- 性能
- Python
- 安装
- 执行 SQL
- Jupyter Notebooks
- marimo Notebooks
- Pandas 上的 SQL
- 从 Pandas 导入
- 导出到 Pandas
- 从 Numpy 导入
- 导出到 Numpy
- Arrow 上的 SQL
- 从 Arrow 导入
- 导出到 Arrow
- Pandas 上的关系型 API
- 多个 Python 线程
- 与 Ibis 集成
- 与 Polars 集成
- 使用 fsspec 文件系统
- SQL 编辑器
- SQL 功能
- 代码片段
- 故障排除
- 术语表
- 离线浏览
- 操作手册
- 概览
- DuckDB 的占用空间
- 安装 DuckDB
- 日志
- 保护 DuckDB 安全
- 非确定性行为
- 限制
- DuckDB Docker 容器
- 开发
- 内部结构
- 站点地图
- 在线演示
文档 / 指南 / Python
多个 Python 线程
本页面演示了如何在多个 Python 线程中同时向 DuckDB 数据库插入数据并进行读取。这在数据持续流入且需要定期重新运行分析的场景中非常有用。请注意,这完全在单个 Python 进程内完成(有关 DuckDB 并发性的详细信息,请参阅 常见问题解答)。欢迎参考此 Google Colab 笔记本 进行练习。
设置
首先,导入 DuckDB 以及 Python 标准库中的几个模块。注意:如果使用 Pandas,请同时在脚本顶部添加 import pandas(因为它必须在多线程之前导入)。然后连接到一个基于文件的 DuckDB 数据库,并创建一个示例表来存储插入的数据。该表将记录完成插入操作的线程名称,并使用 DEFAULT 表达式自动插入数据发生时的时刻。
import duckdb
from threading import Thread, current_thread
import random
duckdb_con = duckdb.connect('my_peristent_db.duckdb')
# Use connect without parameters for an in-memory database
# duckdb_con = duckdb.connect()
duckdb_con.execute("""
CREATE OR REPLACE TABLE my_inserts (
thread_name VARCHAR,
insert_time TIMESTAMP DEFAULT current_timestamp
)
""")
读写函数
接下来,定义将由写入线程和读取线程执行的函数。每个线程都必须使用 .cursor() 方法,基于原始连接创建一个指向同一个 DuckDB 文件的线程本地连接。这种方法同样适用于内存中的 DuckDB 数据库。
def write_from_thread(duckdb_con):
# Create a DuckDB connection specifically for this thread
local_con = duckdb_con.cursor()
# Insert a row with the name of the thread. insert_time is auto-generated.
thread_name = str(current_thread().name)
result = local_con.execute("""
INSERT INTO my_inserts (thread_name)
VALUES (?)
""", (thread_name,)).fetchall()
def read_from_thread(duckdb_con):
# Create a DuckDB connection specifically for this thread
local_con = duckdb_con.cursor()
# Query the current row count
thread_name = str(current_thread().name)
results = local_con.execute("""
SELECT
? AS thread_name,
count(*) AS row_counter,
current_timestamp
FROM my_inserts
""", (thread_name,)).fetchall()
print(results)
创建线程
我们定义要使用的写入线程和读取线程的数量,并定义一个列表来跟踪将要创建的所有线程。然后,先创建写入线程,再创建读取线程。接着,将它们打乱顺序,以便它们以随机顺序启动,从而模拟并发的写入和读取。请注意,此时线程尚未执行,仅完成了定义。
write_thread_count = 50
read_thread_count = 5
threads = []
# Create multiple writer and reader threads (in the same process)
# Pass in the same connection as an argument
for i in range(write_thread_count):
threads.append(Thread(target = write_from_thread,
args = (duckdb_con,),
name = 'write_thread_' + str(i)))
for j in range(read_thread_count):
threads.append(Thread(target = read_from_thread,
args = (duckdb_con,),
name = 'read_thread_' + str(j)))
# Shuffle the threads to simulate a mix of readers and writers
random.seed(6) # Set the seed to ensure consistent results when testing
random.shuffle(threads)
运行线程并展示结果
现在,启动所有线程并行运行,并等待它们全部完成后再打印出结果。请注意,由于进行了随机化处理,读取器和写入器的时间戳正如预期的那样交错出现。
# Kick off all threads in parallel
for thread in threads:
thread.start()
# Ensure all threads complete before printing final results
for thread in threads:
thread.join()
print(duckdb_con.execute("""
SELECT *
FROM my_inserts
ORDER BY
insert_time
""").df())