0剂次免疫史批量获得

 import polars as pl

import polars.selectors as cs
from typing import Optional, List

jz = (
    pl.read_excel(
        "/mnt/c/Users/Administrator/Downloads/不达标乡镇管理儿童接种记录/jzjl(3).xlsx"
    )
    .rename(lambda x: x.lower())
    .with_columns(
        pl.col("jz_sj").str.to_datetime().alias("jz_sj"),
        pl.col("csrq").str.to_date().alias("csrq"),
    )
    .sort("jz_sj")
    .with_columns(pl.col("ym_bm").str.slice(0, 2).alias("ym_dl"))
    .with_columns(
        pl.col("grda_code").cum_count().alias("jc").over(["grda_code", "ym_dl"])
    )
    .with_columns(age=((pl.date(2025, 12, 31) - pl.col("csrq")).dt.total_days() / 30))
)


def get_zero_dose_children(
    jz_data: pl.DataFrame,
    vaccine_config: Optional[pl.DataFrame] = None,
    extra_columns: Optional[List[str]] = None,
) -> pl.DataFrame:
    """
    获取所有疫苗的0剂次儿童数据

    Args:
        jz_data: 接种记录DataFrame,需包含以下列:
                - grda_code: 儿童唯一编码
                - ym_dl: 疫苗大类编码
        vaccine_config: 疫苗配置DataFrame,包含以下列:
                       - 大类编码: 疫苗大类编码
                       - 大类名称: 疫苗名称
                       - 剂次: 规定剂次数
                       - 可替代编码: 可替代的疫苗编码(逗号分隔)
                       如果为None,则使用默认配置
        extra_columns: 需要从jz_data中附加的额外列名列表,如["age", "csrq"]

    Returns:
        包含所有疫苗0剂次儿童的DataFrame,列类型与jz_data保持一致
    """
    # 如果未提供疫苗配置,使用默认配置
    if vaccine_config is None:
        vaccine_config = pl.DataFrame(
            {
                "大类编码": [
                    "01",
                    "02",
                    "03",
                    "04",
                    "12",
                    "16",
                    "17",
                    "18",
                    "19",
                    "55",
                ],
                "大类名称": [
                    "卡介苗",
                    "乙肝疫苗",
                    "脊灰疫苗",
                    "百白破疫苗",
                    "麻腮风疫苗",
                    "A群流脑疫苗",
                    "A群C群流脑疫苗",
                    "乙脑疫苗",
                    "甲肝疫苗",
                    "HPV疫苗",
                ],
                "剂次": [1, 3, 4, 5, 2, 2, 2, 2, 1, 2],
                "可替代编码": ["", "", "50", "49,50", "09,13,14", "53", "", "", "", ""],
            }
        )

    # 获取jz_data中各列的数据类型
    jz_schema = jz_data.schema
    ym_dl_dtype = jz_schema.get("ym_dl", pl.Utf8)
    ym_mc_dtype = jz_schema.get("ym_mc", pl.Utf8)
    jc_dtype = jz_schema.get("jc", pl.UInt32)  # 默认UInt32

    # 获取所有儿童的唯一编码(包含额外列)
    if extra_columns:
        select_cols = ["grda_code"] + extra_columns
        all_children = jz_data.select(select_cols).unique(subset=["grda_code"])
    else:
        all_children = jz_data.select("grda_code").unique()

    # 存储所有0剂次儿童的列表
    zero_dose_list = []

    # 遍历每种疫苗
    for row in vaccine_config.iter_rows(named=True):
        ym_dl_code = row["大类编码"]
        ym_name = row["大类名称"]

        # 查找接种过该疫苗的儿童(包括可替代编码)
        ym_codes = [ym_dl_code]

        # 处理可替代编码
        if row["可替代编码"]:
            ym_codes.extend(row["可替代编码"].split(","))

        # 找到接种过该疫苗(或替代疫苗)的儿童
        vaccinated_children = (
            jz_data.filter(pl.col("ym_dl").is_in(ym_codes)).select("grda_code").unique()
        )

        # 找到未接种该疫苗的儿童(0剂次)
        # 使用cast确保数据类型匹配
        zero_dose_children = all_children.join(
            vaccinated_children, how="anti", on="grda_code"
        ).with_columns(
            ym_dl=pl.lit(ym_dl_code).cast(ym_dl_dtype),
            ym_mc=pl.lit(ym_name).cast(ym_mc_dtype),
            jc=pl.lit(0).cast(jc_dtype),  # 确保类型匹配
        )

        zero_dose_list.append(zero_dose_children)

    # 合并所有0剂次结果
    result = pl.concat(zero_dose_list)

    # 调整列顺序以匹配jz的列顺序
    base_cols = ["grda_code", "ym_dl", "ym_mc", "jc"]
    if extra_columns:
        final_cols = base_cols + extra_columns
    else:
        final_cols = base_cols

    return result.select(final_cols)


# ==================== 使用示例 ====================
# 示例1:基本使用
zero_dose = get_zero_dose_children(jz, extra_columns=["age"])
# 合并数据
test = pl.concat(
    [jz.select("grda_code", "ym_dl", "ym_mc", "jc", "age"), zero_dose]
).sort("grda_code", "ym_dl", "jc")

评论

此博客中的热门博文

Rstudio 使用代理

ShadowsocksR Plus 就回来了~