Dask示例

 import numpy as np

import pandas as pd

import dask.dataframe as dd

from dask.diagnostics import ProgressBar


def windows_to_wsl_path(windows_path):

    wsl_prefix = '/mnt/'

    path = windows_path.replace(':', '').replace('\\', '/').lower()

    wsl_path = wsl_prefix  + path

    return wsl_path


pv=dd.read_csv(windows_to_wsl_path(r"D:\bookbmstdr-main\PV.csv"),encoding='GB18030',dtype={'GRDA_CODE':str,'GRDA_XM':str,'YM_MC':str,'YM_BM':str,'JZ_ZC':int,'GLDW_MC':str,

'GLDW_BM':str})

pv = pv.map_partitions(lambda df: df.rename(columns={col: col.lower() for col in df.columns}))

pv['csrq'] = pv['csrq'].map_partitions(pd.to_datetime, errors='coerce')

pv['jz_sj'] = pv['jz_sj'].str.split(' ').str[0]

pv['jz_sj']=pv['jz_sj'].map_partitions(pd.to_datetime, format='%Y-%m-%d',errors='coerce')

pv['age']=(pv.jz_sj-pv.csrq).dt.days/365

pv['grp'] = pv['age'].apply(lambda x: '成人' if x >= 18 else '儿童', meta=('grp', object))

pv['group']=pv.map_partitions(lambda df:pd.cut(df['age'], bins= [1, 7, 18, 60, np.inf], labels=['1-7', '7-18', '18-60', '>60']))

pv['grp2']=pv.map_partitions(lambda df:np.select([df['age']>=7,df['age']<7], ['>=7','<7'],default='不详'))


pv.dtypes

pv=pv[pv['ym_bm'].str.contains("^03")]


with ProgressBar():

    child_tj=pv.groupby(['grp2']).agg({'grda_code':'count'}).compute()

    child_tj=pv.groupby(['group']).agg({'grda_code':'count'}).compute()

    pvtj=pv.groupby(['grda_code','ym_bm']).agg({'ym_bm':'count'}).compute()

    

    

tj=pvtj.rename(columns={'ym_bm':'count'}).reset_index().sort_values('grda_code')


pv.groupby(['grp']).count().compute()

评论

此博客中的热门博文

V2ray websocket(ws)+tls+nginx分流

Rstudio 使用代理