首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >同时运行两台dask-ml计算机,而不是顺序运行

同时运行两台dask-ml计算机,而不是顺序运行
EN

Stack Overflow用户
提问于 2020-12-22 23:30:18
回答 2查看 262关注 0票数 6

我可以使用dask-ml计算平均值和最频繁的值,如下所示:

代码语言:javascript
复制
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height']) 
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)


    Weight  Age   Height
0   100.0   2.0   5.0
1   85.0    4.5   5.0
2   70.0    7.0   5.0

但是,如果我有一亿行数据,dask似乎会做两个循环,而它只能做一个循环,是否有可能同时和/或并行运行两台计算机,而不是顺序运行?实现这一点的示例代码是什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-12-29 02:16:54

如果实体彼此独立,您可以按照文档和Dask Toutorial中的建议使用dask.delayed来并行化计算。

您的代码将如下所示:

代码语言:javascript
复制
from dask.distributed import Client

client = Client(n_workers=4)

from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute

mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')

def fit_transform_mi(d):
    return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
    return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
    df.iloc[:, [0,1]]=a
    df.iloc[:, [2]]=b
    return df

data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height']) 
a = delayed(fit_transform_mi)(df.iloc[:,[0,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,b,df)
df= c.compute()
print(df)
client.close()

C对象是一个延迟的对象。这个对象包含计算最终结果所需的所有内容,包括对所有必需函数的引用以及它们的输入和相互之间的关系。

票数 2
EN

Stack Overflow用户

发布于 2020-12-30 15:46:20

当数据不适合内存时,Dask对于通过并行处理加速计算很有用。在下面的示例中,使用Dask估算了10个文件中包含的300M行数据。进程图显示: 1.平均和最频繁的输入者是并行运行的;2.所有10个文件也是并行处理的。

设置

为了准备大量数据,您的问题中的三行数据被复制,以形成一个30M行的数据框架。数据框保存在十个不同的文件中,总共产生300M行,具有与您问题中相同的统计数据。

代码语言:javascript
复制
import numpy as np
import pandas as pd

N = 10000000
weight = np.array([100, np.nan, 70]*N)
age = np.array([2, np.nan, 7]*N)
height = np.array([5, np.nan, 5]*N)
df = pd.DataFrame({'Weight': weight, 'Age': age, 'Height': height})

# Save ten large data frames to disk
for i in range(10):
    df.to_parquet(f'./df_to_impute_{i}.parquet', compression='gzip',
                  index=False)

Dask归罪

代码语言:javascript
复制
import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer

# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')

# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer, mostfreq_imputer]

mean_cols = ['Weight', 'Age']
freq_cols = ['Height']
columns = [mean_cols, freq_cols]

# Create a new data frame with imputed values, then visualize the computation.
df_list = []
for imputer, col in zip(imputers, columns):
    df_list.append(imputer.fit_transform(df.loc[:, col]))
imputed_df = dd.concat(df_list, axis=1)
imputed_df.visualize(filename='imputed.svg', rankdir='LR')

# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet', compression='gzip')

输出

代码语言:javascript
复制
imputed_df.head()

    Weight  Age     Height
0   100.0   2.0     5.0
1   85.0    4.5     5.0
2   70.0    7.0     5.0
3   100.0   2.0     5.0
4   85.0    4.5     5.0


# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()

    Weight  Age     Height
count   3.000000e+08    3.000000e+08    300000000.0
mean    8.500000e+01    4.500000e+00    5.0
std     1.224745e+01    2.041241e+00    0.0
min     7.000000e+01    2.000000e+00    5.0
25%     7.000000e+01    2.000000e+00    5.0
50%     8.500000e+01    4.500000e+00    5.0
75%     1.000000e+02    7.000000e+00    5.0
max     1.000000e+02    7.000000e+00    5.0
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65411425

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档