scala 用平均值替换缺失值 - Spark Dataframe
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40057563/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Replace missing values with mean - Spark Dataframe
提问by Dataminer
I have a Spark Dataframe with some missing values. I would like to perform a simple imputation by replacing the missing values with the mean for that column. I am very new to Spark, so I have been struggling to implement this logic. This is what I have managed to do so far:
我有一个包含一些缺失值的 Spark 数据框。我想通过用该列的平均值替换缺失值来执行简单的插补。我对 Spark 很陌生,所以我一直在努力实现这个逻辑。这是我迄今为止设法做到的:
a) To do this for a single column (let's say Col A), this line of code seems to work:
a) 要为单列执行此操作(假设 Col A),这行代码似乎有效:
df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
.first()(0).asInstanceOf[Double])
.otherwise($"ColA"))
b) However, I have not been able to figure out, how to do this for all the columns in my dataframe. I was trying out the Map function, but I believe it loops through each row of a dataframe
b) 但是,我无法弄清楚如何对数据框中的所有列执行此操作。我正在尝试 Map 函数,但我相信它会遍历数据帧的每一行
c) There is a similar question on SO - here. And while I liked the solution (using Aggregated tables and coalesce), I was very keen to know if there is a way to do this by looping through each column (I come from R, so looping through each column using a higher order functional like lapply seems more natural to me).
c) SO- here上有一个类似的问题。虽然我喜欢这个解决方案(使用聚合表和合并),但我很想知道是否有办法通过循环遍历每一列来做到这一点(我来自 R,所以使用更高阶的函数循环遍历每一列,如lapply 对我来说似乎更自然)。
Thanks!
谢谢!
回答by zero323
Spark >= 2.2
火花 >= 2.2
You can use org.apache.spark.ml.feature.Imputer(which supports both mean and median strategy).
您可以使用org.apache.spark.ml.feature.Imputer(同时支持均值和中值策略)。
Scala:
斯卡拉:
import org.apache.spark.ml.feature.Imputer
val imputer = new Imputer()
.setInputCols(df.columns)
.setOutputCols(df.columns.map(c => s"${c}_imputed"))
.setStrategy("mean")
imputer.fit(df).transform(df)
Python:
蟒蛇:
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=df.columns,
outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)
Spark < 2.2
火花 < 2.2
Here you are:
这个给你:
import org.apache.spark.sql.functions.mean
df.na.fill(df.columns.zip(
df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)
where
在哪里
df.columns.map(mean(_)): Array[Column]
computes an average for each column,
计算每列的平均值,
df.select(_: *).first.toSeq: Seq[Any]
collects aggregated values and converts row to Seq[Any](I know it is suboptimal but this is the API we have to work with),
收集聚合值并将行转换为Seq[Any](我知道这是次优的,但这是我们必须使用的 API),
df.columns.zip(_).toMap: Map[String,Any]
creates aMap: Map[String, Any]which maps from the column name to its average, and finally:
创建aMap: Map[String, Any]从列名到其平均值的映射,最后:
df.na.fill(_): DataFrame
fills the missing values using:
使用以下方法填充缺失值:
fill: Map[String, Any] => DataFrame
from DataFrameNaFunctions.
从DataFrameNaFunctions.
To ingore NaNentries you can replace:
要NaN输入条目,您可以替换:
df.select(df.columns.map(mean(_)): _*).first.toSeq
with:
和:
import org.apache.spark.sql.functions.{col, isnan, when}
df.select(df.columns.map(
c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
回答by Michael P
For PySpark, this is the code I used:
对于 PySpark,这是我使用的代码:
mean_dict = { col: 'mean' for col in df.columns }
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
df.fillna( col_avgs ).show()
The four steps are:
这四个步骤是:
- Create the dictionary
mean_dictmapping column names to the aggregate operation (mean) - Calculate the mean for each column, and save it as the dictionary
col_avgs - The column names in
col_avgsstart withavg(and end with), e.g.avg(col1). Strip the parentheses out. - Fill the columns of the dataframe with the averages using
col_avgs
- 创建字典
mean_dict映射列名到聚合操作(平均值) - 计算每一列的平均值,并保存为字典
col_avgs - 以
col_avgs开头avg(和结尾的列名),例如avg(col1)。去掉括号。 - 使用平均值填充数据框的列
col_avgs
回答by noleto
For imputing the median (instead of the mean) in PySpark < 2.2
用于在 PySpark < 2.2 中输入中位数(而不是平均值)
## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
Then, apply na.fill
然后,申请 na.fill
df_imputed = df.na.fill(median_dict)

