多线程鸭湖
from datetime import datetime
from datetime import timedelta, date
import polars as pl
import duckdb as dd
import time as time_module
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 初始化
uri = "oracle://"
dd.sql("attach 'ducklake:meta.ducklake' as lake;")
# 如果表存在,先删除
dd.sql("DROP TABLE IF EXISTS lake.ods.jzjl;")
# 创建 schema
dd.sql("CREATE SCHEMA IF NOT EXISTS lake.ods;")
# 线程安全的标志
first_batch_lock = threading.Lock()
first_batch = True
def worker(current_date):
"""处理单个日期的数据"""
global first_batch
start_time = time_module.time()
result = {
'date': current_date,
'status': 'success',
'rows': 0,
'elapsed': 0,
'error': None
}
try:
next_date = current_date + timedelta(days=1)
query = f"""
SELECT * FROM inoc_jzjl
WHERE jz_sj >= TO_DATE('{current_date}', 'YYYY-MM-DD')
AND jz_sj < TO_DATE('{next_date}', 'YYYY-MM-DD')
"""
# 查询数据
df = pl.read_database_uri(query=query, uri=uri)
result['rows'] = len(df)
if len(df) > 0:
arrow_table = df.to_arrow()
# 使用锁确保线程安全,最小化锁的持有时间
with first_batch_lock:
# 检查是否需要创建表
if first_batch:
dd.register("temp_insert", arrow_table)
dd.sql("CREATE TABLE lake.ods.jzjl AS SELECT * FROM temp_insert;")
dd.unregister("temp_insert")
first_batch = False
else:
dd.register("temp_insert", arrow_table)
dd.sql("INSERT INTO lake.ods.jzjl SELECT * FROM temp_insert;")
dd.unregister("temp_insert")
result['elapsed'] = time_module.time() - start_time
except Exception as e:
result['status'] = 'failed'
result['error'] = str(e)
result['elapsed'] = time_module.time() - start_time
return result
# 主程序
if __name__ == "__main__":
bdate = date(year=2026, month=1, day=1)
edate = date(year=2026, month=1, day=31)
# 创建任务列表
tasks = []
for i in range((edate - bdate).days + 1):
current_date = bdate + timedelta(days=i)
tasks.append(current_date)
total_tasks = len(tasks)
print(f"开始处理 {total_tasks} 个任务...")
print(f"日期范围: {bdate} 至 {edate}")
print("-" * 60)
# 记录总体开始时间
total_start_time = time_module.time()
# 使用线程池执行任务,最多10个线程同时执行
with ThreadPoolExecutor(max_workers=10, thread_name_prefix="Worker") as executor:
# 提交所有任务
futures = {executor.submit(worker, task): task for task in tasks}
# 按完成顺序处理结果
completed_count = 0
for future in as_completed(futures):
try:
result = future.result()
completed_count += 1
# 实时打印结果
if result['status'] == 'success':
print(f"{result['date']}: {result['elapsed']:.4f}s - {result['rows']} rows")
else:
print(f"{result['date']}: 处理失败 - {result['error']}")
# 显示进度
print(f"进度: {completed_count}/{total_tasks}")
except Exception as e:
print(f"任务执行异常: {e}")
# 计算总耗时
total_elapsed = time_module.time() - total_start_time
print("-" * 60)
print(f"所有任务处理完成!")
print(f"总耗时: {total_elapsed:.2f}s")
print(f"完成任务数: {total_tasks}/{total_tasks}")
评论
发表评论