博文

多线程鸭湖

  from datetime import datetime from datetime import timedelta , date import polars as pl import duckdb as dd import time as time_module from concurrent.futures import ThreadPoolExecutor , as_completed import threading # 初始化 uri = "oracle:// " dd . sql ( "attach 'ducklake:meta.ducklake' as lake;" ) # 如果表存在,先删除 dd . sql ( "DROP TABLE IF EXISTS lake.ods.jzjl;" ) # 创建 schema dd . sql ( "CREATE SCHEMA IF NOT EXISTS lake.ods;" ) # 线程安全的标志 first_batch_lock = threading . Lock () first_batch = True def worker ( current_date ):     """处理单个日期的数据"""     global first_batch         start_time = time_module . time ()     result = {         'date' : current_date ,         'status' : 'success' ,         'rows' : 0 ,         'elapsed' : 0 ,         'error' : None     }         try :   ...

鸭湖连接oracle

  from datetime import datetime from datetime import timedelta , date import polars as pl import duckdb as dd import time as time_module #初始化 uri = "oracle:// " dd . sql ( "attach 'ducklake:meta.ducklake' as lake;" ) # 如果表存在,先删除 dd . sql ( "DROP TABLE IF EXISTS lake.ods.jzjl;" ) # 创建 schema dd . sql ( "CREATE SCHEMA IF NOT EXISTS lake.ods;" ) # 写数据 bdate = date ( year = 2026 , month = 1 , day = 1 ) edate = date ( year = 2026 , month = 1 , day = 3 ) first_batch = True for i in range (( edate - bdate ). days + 1 ):     start_time = time_module . time ()     current_date = bdate + timedelta ( days = i )         try :         next_date = current_date + timedelta ( days = 1 )         query = f """         SELECT * FROM inoc_jzjl         WHERE jz_sj >= TO_DATE(' { current_date } ', 'YYYY-MM-DD')         AN...

sqlite 日期字符串转integer

  update lg -- 更新数据 SET age = CAST (( date ( -- 格式化 jz_sj (取前10位日期部分进行处理) printf ( '%04d-%02d-%02d' , substr ( replace ( jz_sj , '/' , '-' ), 1 , 4 ), CAST ( substr ( replace ( jz_sj , '/' , '-' ), 6 , instr ( substr ( replace ( jz_sj , '/' , '-' ), 6 ), '-' ) - 1 ) AS INTEGER ), CAST ( substr ( replace ( jz_sj , '/' , '-' ), 6 + instr ( substr ( replace ( jz_sj , '/' , '-' ), 6 ), '-' )) AS INTEGER ) ) ) - julianday ( -- 格式化 csrq printf ( '%04d-%02d-%02d' , substr ( replace ( csrq , '/' , '-' ), 1 , 4 ), CAST ( substr ( replace ( csrq , '/' , '-' ), 6 , instr ( substr ( replace ( csrq , '/' , '-' ), 6 ), '-' ) - 1 ) AS INTEGER ), CAST ( substr ( replace ( csrq , '/' , '-' ), ...

polars duckdb

  import polars as pl import polars.selectors as cs import duckdb con = duckdb . connect ( "/mnt/c/Users/Administrator/Downloads/流感肺炎/fy.db" ) # 获取所有表名 tables = con . execute ( "SHOW TABLES" ). pl () print ( tables ) # 查看 test 表的列信息 schema = con . execute ( "DESCRIBE test" ). pl () print ( schema ) # 读数据库表 test = con . execute ( """     SELECT * FROM test """ ). pl ( lazy = True ) # 比较慢 # test2 = pl.read_database("SELECT * FROM test", connection=con).lazy() test . collect_schema () # 关闭连接 con . close () ####################### import polars as pl import polars.selectors as cs import duckdb con = duckdb . connect ( "/mnt/c/Users/Administrator/Downloads/流感肺炎/fy.db" ) temp = (     pl . scan_csv (         "/mnt/c/Users/Administrator/Downloads/流感肺炎/接种.csv" , infer_schema = None     )     . rename ( lambda col : col . lower ())     . with_columns (         pl . col ( "csr...

0剂次免疫史批量获得

  import polars as pl import polars.selectors as cs from typing import Optional , List jz = (     pl . read_excel (         "/mnt/c/Users/Administrator/Downloads/不达标乡镇管理儿童接种记录/jzjl(3).xlsx"     )     . rename ( lambda x : x . lower ())     . with_columns (         pl . col ( "jz_sj" ). str . to_datetime (). alias ( "jz_sj" ),         pl . col ( "csrq" ). str . to_date (). alias ( "csrq" ),     )     . sort ( "jz_sj" )     . with_columns ( pl . col ( "ym_bm" ). str . slice ( 0 , 2 ). alias ( "ym_dl" ))     . with_columns (         pl . col ( "grda_code" ). cum_count (). alias ( "jc" ). over ([ "grda_code" , "ym_dl" ])     )     . with_columns ( age = (( pl . date ( 2025 , 12 , 31 ) - pl . col ( "csrq" )). dt . total_days () / 30 )) ) def get_zero_dose_children (     jz_data : pl . ...

polars_st 绘制地图

  import polars as pl import polars_st as st import altair as alt import json alt . data_transformers . enable ( "vegafusion" ) luis = st . read_file ( '/mnt/c/Users/xuefliang/Downloads/tl_2023_22_prisecroads/tl_2023_22_prisecroads.shp' ) features = [     { "type" : "Feature" , "geometry" : json . loads (r. pop ( "geometry" )), "properties" : r}     for r in luis . st . to_geojson (). to_dicts () ] chart = alt . Chart ( alt . Data ( values = features )). mark_geoshape (     filled = False ,     stroke = 'steelblue' ,     strokeWidth = 1 ). properties (     width = 800 ,     height = 600 ,     title = "Louisiana Primary Roads Map" ). project (     type = 'mercator' ) chart # 使用命名空间名称 'map' @ pl . api . register_dataframe_namespace ( "map" ) class GeoPlotter :     def __init__ ( self , df : pl . DataFrame ):         self . _df = df    ...

将docx中的表格读入pandas

  from docx import Document import pandas as pd import polars as pl def tables_to_pd ( word_file ):     doc = Document ( word_file )     all_data = []   # 存储所有表格的数据行     headers = None   # 存储表头     for i , table in enumerate ( doc . tables ):         for j , row in enumerate ( table . rows ):             row_data = [ cell . text for cell in row . cells ]             if j == 0 :   # 第一行作为表头                 if headers is None :                     headers = row_data   # 只保存第一个表格的表头             else :   # 数据行                 all_data . append ( row_data )     # 创建一个DataFrame     if headers and all_data :       ...