多线程鸭湖

 from datetime import datetime

from datetime import timedelta, date
import polars as pl
import duckdb as dd
import time as time_module
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# 初始化
uri = "oracle://"
dd.sql("attach 'ducklake:meta.ducklake' as lake;")
# 如果表存在,先删除
dd.sql("DROP TABLE IF EXISTS lake.ods.jzjl;")
# 创建 schema
dd.sql("CREATE SCHEMA IF NOT EXISTS lake.ods;")

# 线程安全的标志
first_batch_lock = threading.Lock()
first_batch = True

def worker(current_date):
    """处理单个日期的数据"""
    global first_batch
   
    start_time = time_module.time()
    result = {
        'date': current_date,
        'status': 'success',
        'rows': 0,
        'elapsed': 0,
        'error': None
    }
   
    try:
        next_date = current_date + timedelta(days=1)
        query = f"""
        SELECT * FROM inoc_jzjl
        WHERE jz_sj >= TO_DATE('{current_date}', 'YYYY-MM-DD')
        AND jz_sj < TO_DATE('{next_date}', 'YYYY-MM-DD')
        """
       
        # 查询数据
        df = pl.read_database_uri(query=query, uri=uri)
        result['rows'] = len(df)
       
        if len(df) > 0:
            arrow_table = df.to_arrow()
           
            # 使用锁确保线程安全,最小化锁的持有时间
            with first_batch_lock:
                # 检查是否需要创建表
                if first_batch:
                    dd.register("temp_insert", arrow_table)
                    dd.sql("CREATE TABLE lake.ods.jzjl AS SELECT * FROM temp_insert;")
                    dd.unregister("temp_insert")
                    first_batch = False
                else:
                    dd.register("temp_insert", arrow_table)
                    dd.sql("INSERT INTO lake.ods.jzjl SELECT * FROM temp_insert;")
                    dd.unregister("temp_insert")
       
        result['elapsed'] = time_module.time() - start_time
       
    except Exception as e:
        result['status'] = 'failed'
        result['error'] = str(e)
        result['elapsed'] = time_module.time() - start_time
   
    return result

# 主程序
if __name__ == "__main__":
    bdate = date(year=2026, month=1, day=1)
    edate = date(year=2026, month=1, day=31)
   
    # 创建任务列表
    tasks = []
    for i in range((edate - bdate).days + 1):
        current_date = bdate + timedelta(days=i)
        tasks.append(current_date)
   
    total_tasks = len(tasks)
   
    print(f"开始处理 {total_tasks} 个任务...")
    print(f"日期范围: {bdate}{edate}")
    print("-" * 60)
   
    # 记录总体开始时间
    total_start_time = time_module.time()
   
    # 使用线程池执行任务,最多10个线程同时执行
    with ThreadPoolExecutor(max_workers=10, thread_name_prefix="Worker") as executor:
        # 提交所有任务
        futures = {executor.submit(worker, task): task for task in tasks}
       
        # 按完成顺序处理结果
        completed_count = 0
        for future in as_completed(futures):
            try:
                result = future.result()
                completed_count += 1
               
                # 实时打印结果
                if result['status'] == 'success':
                    print(f"{result['date']}: {result['elapsed']:.4f}s - {result['rows']} rows")
                else:
                    print(f"{result['date']}: 处理失败 - {result['error']}")
               
                # 显示进度
                print(f"进度: {completed_count}/{total_tasks}")
               
            except Exception as e:
                print(f"任务执行异常: {e}")
   
    # 计算总耗时
    total_elapsed = time_module.time() - total_start_time
   
    print("-" * 60)
    print(f"所有任务处理完成!")
    print(f"总耗时: {total_elapsed:.2f}s")
    print(f"完成任务数: {total_tasks}/{total_tasks}")

评论

此博客中的热门博文

Rstudio 使用代理

ShadowsocksR Plus 就回来了~