鸭湖连接oracle
from datetime import datetime
from datetime import timedelta, date
import polars as pl
import duckdb as dd
import time as time_module
#初始化
uri = "oracle://"
dd.sql("attach 'ducklake:meta.ducklake' as lake;")
# 如果表存在,先删除
dd.sql("DROP TABLE IF EXISTS lake.ods.jzjl;")
# 创建 schema
dd.sql("CREATE SCHEMA IF NOT EXISTS lake.ods;")
# 写数据
bdate = date(year=2026, month=1, day=1)
edate = date(year=2026, month=1, day=3)
first_batch = True
for i in range((edate - bdate).days + 1):
start_time = time_module.time()
current_date = bdate + timedelta(days=i)
try:
next_date = current_date + timedelta(days=1)
query = f"""
SELECT * FROM inoc_jzjl
WHERE jz_sj >= TO_DATE('{current_date}', 'YYYY-MM-DD')
AND jz_sj < TO_DATE('{next_date}', 'YYYY-MM-DD')
"""
df = pl.read_database_uri(query=query, uri=uri)
if len(df) > 0:
arrow_table = df.to_arrow()
dd.register("temp_insert", arrow_table)
if first_batch:
dd.sql("CREATE TABLE lake.ods.jzjl AS SELECT * FROM temp_insert;")
first_batch = False
else:
dd.sql("INSERT INTO lake.ods.jzjl SELECT * FROM temp_insert;")
dd.unregister("temp_insert")
elapsed = time_module.time() - start_time
print(f"{current_date}: {elapsed:.4f}s - {len(df)} rows")
except Exception as e:
print(f"{current_date}: 处理失败 - {e}")
##读数据
print(dd.sql("select count(*) from lake.ods.jzjl"))
##dd数据分析
(
dd.sql("select ym_mc,count(1) from lake.ods.jzjl group by ym_mc")
)
##polars数据分析
jz=(
dd.sql("select * from lake.ods.jzjl").pl(lazy=True)
.rename(lambda x:x.lower())
)
(
jz.collect_schema()
)
(
jz.group_by('ym_mc').agg(pl.col('inoc_jzjl_lsh').len()).collect(engine='streaming')
)
评论
发表评论