鸭湖连接oracle

 from datetime import datetime

from datetime import timedelta, date
import polars as pl
import duckdb as dd
import time as time_module

#初始化
uri = "oracle://"
dd.sql("attach 'ducklake:meta.ducklake' as lake;")
# 如果表存在,先删除
dd.sql("DROP TABLE IF EXISTS lake.ods.jzjl;")
# 创建 schema
dd.sql("CREATE SCHEMA IF NOT EXISTS lake.ods;")

# 写数据
bdate = date(year=2026, month=1, day=1)
edate = date(year=2026, month=1, day=3)
first_batch = True
for i in range((edate - bdate).days + 1):
    start_time = time_module.time()
    current_date = bdate + timedelta(days=i)
   
    try:
        next_date = current_date + timedelta(days=1)
        query = f"""
        SELECT * FROM inoc_jzjl
        WHERE jz_sj >= TO_DATE('{current_date}', 'YYYY-MM-DD')
        AND jz_sj < TO_DATE('{next_date}', 'YYYY-MM-DD')
        """
       
        df = pl.read_database_uri(query=query, uri=uri)
       
        if len(df) > 0:            
            arrow_table = df.to_arrow()
            dd.register("temp_insert", arrow_table)
           
            if first_batch:
                dd.sql("CREATE TABLE lake.ods.jzjl AS SELECT * FROM temp_insert;")
                first_batch = False
            else:
                dd.sql("INSERT INTO lake.ods.jzjl SELECT * FROM temp_insert;")
            dd.unregister("temp_insert")
       
        elapsed = time_module.time() - start_time
        print(f"{current_date}: {elapsed:.4f}s - {len(df)} rows")
       
    except Exception as e:
        print(f"{current_date}: 处理失败 - {e}")

##读数据
print(dd.sql("select count(*) from lake.ods.jzjl"))

##dd数据分析
(
    dd.sql("select ym_mc,count(1) from lake.ods.jzjl group by ym_mc")
)

##polars数据分析
jz=(
    dd.sql("select * from lake.ods.jzjl").pl(lazy=True)
    .rename(lambda x:x.lower())
)

(
    jz.collect_schema()
)

(
    jz.group_by('ym_mc').agg(pl.col('inoc_jzjl_lsh').len()).collect(engine='streaming')
)

评论

此博客中的热门博文

Rstudio 使用代理

ShadowsocksR Plus 就回来了~