0剂次免疫史批量获得
import polars as pl
import polars.selectors as cs
from typing import Optional, List
jz = (
pl.read_excel(
"/mnt/c/Users/Administrator/Downloads/不达标乡镇管理儿童接种记录/jzjl(3).xlsx"
)
.rename(lambda x: x.lower())
.with_columns(
pl.col("jz_sj").str.to_datetime().alias("jz_sj"),
pl.col("csrq").str.to_date().alias("csrq"),
)
.sort("jz_sj")
.with_columns(pl.col("ym_bm").str.slice(0, 2).alias("ym_dl"))
.with_columns(
pl.col("grda_code").cum_count().alias("jc").over(["grda_code", "ym_dl"])
)
.with_columns(age=((pl.date(2025, 12, 31) - pl.col("csrq")).dt.total_days() / 30))
)
def get_zero_dose_children(
jz_data: pl.DataFrame,
vaccine_config: Optional[pl.DataFrame] = None,
extra_columns: Optional[List[str]] = None,
) -> pl.DataFrame:
"""
获取所有疫苗的0剂次儿童数据
Args:
jz_data: 接种记录DataFrame,需包含以下列:
- grda_code: 儿童唯一编码
- ym_dl: 疫苗大类编码
vaccine_config: 疫苗配置DataFrame,包含以下列:
- 大类编码: 疫苗大类编码
- 大类名称: 疫苗名称
- 剂次: 规定剂次数
- 可替代编码: 可替代的疫苗编码(逗号分隔)
如果为None,则使用默认配置
extra_columns: 需要从jz_data中附加的额外列名列表,如["age", "csrq"]
Returns:
包含所有疫苗0剂次儿童的DataFrame,列类型与jz_data保持一致
"""
# 如果未提供疫苗配置,使用默认配置
if vaccine_config is None:
vaccine_config = pl.DataFrame(
{
"大类编码": [
"01",
"02",
"03",
"04",
"12",
"16",
"17",
"18",
"19",
"55",
],
"大类名称": [
"卡介苗",
"乙肝疫苗",
"脊灰疫苗",
"百白破疫苗",
"麻腮风疫苗",
"A群流脑疫苗",
"A群C群流脑疫苗",
"乙脑疫苗",
"甲肝疫苗",
"HPV疫苗",
],
"剂次": [1, 3, 4, 5, 2, 2, 2, 2, 1, 2],
"可替代编码": ["", "", "50", "49,50", "09,13,14", "53", "", "", "", ""],
}
)
# 获取jz_data中各列的数据类型
jz_schema = jz_data.schema
ym_dl_dtype = jz_schema.get("ym_dl", pl.Utf8)
ym_mc_dtype = jz_schema.get("ym_mc", pl.Utf8)
jc_dtype = jz_schema.get("jc", pl.UInt32) # 默认UInt32
# 获取所有儿童的唯一编码(包含额外列)
if extra_columns:
select_cols = ["grda_code"] + extra_columns
all_children = jz_data.select(select_cols).unique(subset=["grda_code"])
else:
all_children = jz_data.select("grda_code").unique()
# 存储所有0剂次儿童的列表
zero_dose_list = []
# 遍历每种疫苗
for row in vaccine_config.iter_rows(named=True):
ym_dl_code = row["大类编码"]
ym_name = row["大类名称"]
# 查找接种过该疫苗的儿童(包括可替代编码)
ym_codes = [ym_dl_code]
# 处理可替代编码
if row["可替代编码"]:
ym_codes.extend(row["可替代编码"].split(","))
# 找到接种过该疫苗(或替代疫苗)的儿童
vaccinated_children = (
jz_data.filter(pl.col("ym_dl").is_in(ym_codes)).select("grda_code").unique()
)
# 找到未接种该疫苗的儿童(0剂次)
# 使用cast确保数据类型匹配
zero_dose_children = all_children.join(
vaccinated_children, how="anti", on="grda_code"
).with_columns(
ym_dl=pl.lit(ym_dl_code).cast(ym_dl_dtype),
ym_mc=pl.lit(ym_name).cast(ym_mc_dtype),
jc=pl.lit(0).cast(jc_dtype), # 确保类型匹配
)
zero_dose_list.append(zero_dose_children)
# 合并所有0剂次结果
result = pl.concat(zero_dose_list)
# 调整列顺序以匹配jz的列顺序
base_cols = ["grda_code", "ym_dl", "ym_mc", "jc"]
if extra_columns:
final_cols = base_cols + extra_columns
else:
final_cols = base_cols
return result.select(final_cols)
# ==================== 使用示例 ====================
# 示例1:基本使用
zero_dose = get_zero_dose_children(jz, extra_columns=["age"])
# 合并数据
test = pl.concat(
[jz.select("grda_code", "ym_dl", "ym_mc", "jc", "age"), zero_dose]
).sort("grda_code", "ym_dl", "jc")
评论
发表评论