我可以使用dask-ml计算平均值和最频繁的值,如下所示:
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0但是,如果我有一亿行数据,dask似乎会做两个循环,而它只能做一个循环,是否有可能同时和/或并行运行两台计算机,而不是顺序运行?实现这一点的示例代码是什么?
发布于 2020-12-29 02:16:54
如果实体彼此独立,您可以按照文档和Dask Toutorial中的建议使用dask.delayed来并行化计算。
您的代码将如下所示:
from dask.distributed import Client
client = Client(n_workers=4)
from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
def fit_transform_mi(d):
return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
df.iloc[:, [0,1]]=a
df.iloc[:, [2]]=b
return df
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
a = delayed(fit_transform_mi)(df.iloc[:,[0,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,b,df)
df= c.compute()
print(df)
client.close()C对象是一个延迟的对象。这个对象包含计算最终结果所需的所有内容,包括对所有必需函数的引用以及它们的输入和相互之间的关系。
发布于 2020-12-30 15:46:20
当数据不适合内存时,Dask对于通过并行处理加速计算很有用。在下面的示例中,使用Dask估算了10个文件中包含的300M行数据。进程图显示: 1.平均和最频繁的输入者是并行运行的;2.所有10个文件也是并行处理的。
设置
为了准备大量数据,您的问题中的三行数据被复制,以形成一个30M行的数据框架。数据框保存在十个不同的文件中,总共产生300M行,具有与您问题中相同的统计数据。
import numpy as np
import pandas as pd
N = 10000000
weight = np.array([100, np.nan, 70]*N)
age = np.array([2, np.nan, 7]*N)
height = np.array([5, np.nan, 5]*N)
df = pd.DataFrame({'Weight': weight, 'Age': age, 'Height': height})
# Save ten large data frames to disk
for i in range(10):
df.to_parquet(f'./df_to_impute_{i}.parquet', compression='gzip',
index=False)Dask归罪
import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer
# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')
# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer, mostfreq_imputer]
mean_cols = ['Weight', 'Age']
freq_cols = ['Height']
columns = [mean_cols, freq_cols]
# Create a new data frame with imputed values, then visualize the computation.
df_list = []
for imputer, col in zip(imputers, columns):
df_list.append(imputer.fit_transform(df.loc[:, col]))
imputed_df = dd.concat(df_list, axis=1)
imputed_df.visualize(filename='imputed.svg', rankdir='LR')
# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet', compression='gzip')输出

imputed_df.head()
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
3 100.0 2.0 5.0
4 85.0 4.5 5.0
# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()
Weight Age Height
count 3.000000e+08 3.000000e+08 300000000.0
mean 8.500000e+01 4.500000e+00 5.0
std 1.224745e+01 2.041241e+00 0.0
min 7.000000e+01 2.000000e+00 5.0
25% 7.000000e+01 2.000000e+00 5.0
50% 8.500000e+01 4.500000e+00 5.0
75% 1.000000e+02 7.000000e+00 5.0
max 1.000000e+02 7.000000e+00 5.0https://stackoverflow.com/questions/65411425
复制相似问题