Polars 检查身份证号码是否符合规则

 import polars as pl


def check_id(id: str) -> bool:
    if len(id) != 18:
        return False
   
    # 有效的地区码前两位(省份代码)
    valid_provinces = {
        '11', '12', '13', '14', '15',  # 北京、天津、河北、山西、内蒙古
        '21', '22', '23',              # 辽宁、吉林、黑龙江
        '31', '32', '33', '34', '35', '36', '37',  # 上海、江苏、浙江、安徽、福建、江西、山东
        '41', '42', '43', '44', '45', '46',        # 河南、湖北、湖南、广东、广西、海南
        '50', '51', '52', '53', '54',              # 重庆、四川、贵州、云南、西藏
        '61', '62', '63', '64', '65',              # 陕西、甘肃、青海、宁夏、新疆
        '71',                                      # 台湾
        '81', '82'                                 # 香港、澳门
    }
   
    # 验证前两位地区码
    province_code = id[:2]
    if province_code not in valid_provinces:
        return False
   
    # 原有的校验码验证
    weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
    check_codes = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
   
    try:
        # 验证前17位是否为数字
        for i in range(17):
            int(id[i])
       
        # 计算校验码
        factors = [int(id[i]) * weights[i] for i in range(17)]
        checksum = sum(factors) % 11
       
        # 验证最后一位校验码
        return id[17] == check_codes[checksum]
       
    except ValueError:
        # 如果转换为int失败,说明包含非数字字符
        return False

(
    pl.read_excel('/mnt/c/Users/Administrator/Downloads/主动监测数据/tmp_gmxzd_20250929.xlsx',schema_overrides={'病例诊断类别(1=主要,2=次要,9=缺失)':pl.Utf8})
    .filter(
        pl.col('病例身份证号').is_not_null() &  # 过滤空值
        (pl.col('病例身份证号').str.len_chars() > 0)  # 过滤空字符串
    )
    .with_columns(
        pl.col("病例身份证号")
        .map_elements(check_id, return_dtype=pl.Boolean)
        .alias('valid')
    )
    .filter(pl.col.valid==True)
    .with_columns(
        pl.when(pl.col('病例身份证号').str.slice(16,1).cast(pl.UInt32) % 2 ==1)
        .then(pl.lit(1))
        .otherwise(pl.lit(2))
        .alias("病例性别(1=男,2=女,9=缺失,如缺失,先用身份证号填充)")
    )
    .with_columns(
        pl.when(pl.col("病例就诊来源(1=门诊,2=急诊,3=住院)")=="门诊")
        .then(pl.lit(1))
        .when(pl.col("病例就诊来源(1=门诊,2=急诊,3=住院)")=="住院")
        .then(pl.lit(3))
        .otherwise(pl.lit(2))
        .alias("病例就诊来源(1=门诊,2=急诊,3=住院)")
    )
    .with_columns(
        pl.when(pl.col("病例诊断类别(1=主要,2=次要,9=缺失)").str.len_chars()==0)
        .then(pl.lit(9))
        .otherwise(pl.col("病例诊断类别(1=主要,2=次要,9=缺失)"))
        .alias("病例诊断类别(1=主要,2=次要,9=缺失)")
    )
    .with_columns(
        pl.when(pl.col("病例死亡(1=是,2=否)")=="是")
        .then(pl.lit(1))
        .otherwise(pl.lit(2))
        .alias("病例死亡(1=是,2=否)")
    )
    .drop("valid")
    .rename({'病例性别(1=男,2=女,9=缺失,如缺失,先用身份证号填充)':'病例性别','病例就诊来源(1=门诊,2=急诊,3=住院)':'病例就诊来源','病例诊断类别(1=主要,2=次要,9=缺失)':'病例诊断类别','病例死亡(1=是,2=否)':'病例死亡','病例出生日期(如缺失,用身份证号填充)':'病例出生日期'})
    .to_pandas()
    .to_excel('/mnt/c/Users/Administrator/Downloads/主动监测数据clean/tmp_gmxzd_20250929.xlsx',index=False)
)
   

评论

此博客中的热门博文

V2ray websocket(ws)+tls+nginx分流

Rstudio 使用代理