scala 从 Spark DataFrame 中删除嵌套列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32727279/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Dropping a nested column from Spark DataFrame
提问by Nikhil J Joshi
I have a DataFramewith the schema
我有一个DataFrame架构
root
|-- label: string (nullable = true)
|-- features: struct (nullable = true)
| |-- feat1: string (nullable = true)
| |-- feat2: string (nullable = true)
| |-- feat3: string (nullable = true)
While, I am able to filter the data frame using
虽然,我可以使用过滤数据框
val data = rawData
.filter( !(rawData("features.feat1") <=> "100") )
I am unable to drop the columns using
我无法使用删除列
val data = rawData
.drop("features.feat1")
Is it something that I am doing wrong here? I also tried (unsuccessfully) doing drop(rawData("features.feat1")), though it does not make much sense to do so.
这是我在这里做错了吗?我也试过(不成功)这样做drop(rawData("features.feat1")),虽然这样做没有多大意义。
Thanks in advance,
提前致谢,
Nikhil
尼基尔
回答by zero323
It is just a programming exercise but you can try something like this:
这只是一个编程练习,但你可以尝试这样的事情:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try
case class DFWithDropFrom(df: DataFrame) {
def getSourceField(source: String): Try[StructField] = {
Try(df.schema.fields.filter(_.name == source).head)
}
def getType(sourceField: StructField): Try[StructType] = {
Try(sourceField.dataType.asInstanceOf[StructType])
}
def genOutputCol(names: Array[String], source: String): Column = {
f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
}
def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
getSourceField(source)
.flatMap(getType)
.map(_.fieldNames.diff(toDrop))
.map(genOutputCol(_, source))
.map(df.withColumn(source, _))
.getOrElse(df)
}
}
Example usage:
用法示例:
scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features
scala> case class record(label: String, features: features)
defined class record
scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]
scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
| label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+
scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
Add an implicit conversionand you're good to go.
添加隐式转换,您就可以开始了。
回答by Michael Spector
This version allows you to remove nested columns at any level:
此版本允许您删除任何级别的嵌套列:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}
/**
* Various Spark utilities and extensions of DataFrame
*/
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else {
colType match {
case colType: StructType =>
if (dropColName.startsWith(s"${fullColName}.")) {
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
} else {
Some(col)
}
case other => Some(col)
}
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Usage:
用法:
import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")
回答by mmendez.semantic
Expanding on spektom answer. With support for array types:
扩展 spektom 答案。支持数组类型:
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
Some(struct(innerType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
}
case other => Some(col)
}
} else {
Some(col)
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
回答by Lior Chaga
Following spektom's code snippet for scala, I've created a similar code in Java. Since java 8 doesn't have foldLeft, I used forEachOrdered. This code is suitable for spark 2.x (I'm using 2.1) Also I noted that dropping a column and adding it using withColumn with the same name doesn't work, so I'm just replacing the column, and it seem to work.
按照 spektom 的 scala 代码片段,我在 Java 中创建了一个类似的代码。由于 java 8 没有 foldLeft,我使用了 forEachOrdered。此代码适用于 spark 2.x(我使用的是 2.1)另外我注意到删除列并使用 withColumn 添加它不起作用,所以我只是替换列,它似乎工作。
Code is not fully tested, hope it works :-)
代码没有经过全面测试,希望它能工作:-)
public class DataFrameUtils {
public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
Arrays.stream(dataFrame.schema().fields())
.flatMap( f -> {
if (columnName.startsWith(f.name() + ".")) {
final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
}).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDF();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + ".")) {
column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<Row> df;
public DataFrameFolder(Dataset<Row> df) {
this.df = df;
}
public Dataset<Row> getDF() {
return df;
}
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
Usage example:
用法示例:
private class Pojo {
private String str;
private Integer number;
private List<String> strList;
private Pojo2 pojo2;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
public Pojo2 getPojo2() {
return pojo2;
}
}
private class Pojo2 {
private String str;
private Integer number;
private List<String> strList;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
}
SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");
Original struct:
原始结构:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = true)
| |-- number: integer (nullable = true)
| |-- str: string (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
After drop:
掉落后:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = false)
| |-- number: integer (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
回答by kiae
Another (PySpark) way would be to drop the features.feat1column by creating featuresagain:
另一种(PySpark)方法是features.feat1通过features再次创建来删除列:
from pyspark.sql.functions import col, arrays_zip
display(df
.withColumn("features", arrays_zip("features.feat2", "features.feat3"))
.withColumn("features", col("features").cast(schema))
)
Where schemais the new schema (excluding features.feat1).
schema新架构在哪里(不包括features.feat1)。
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('feat2', StringType(), True),
StructField('feat3', StringType(), True),
]
)
回答by M.Vanderlee
PySpark implementation
PySpark 实现
import pyspark.sql.functions as sf
def _drop_nested_field(
schema: StructType,
field_to_drop: str,
parents: List[str] = None,
) -> Column:
parents = list() if parents is None else parents
src_col = lambda field_names: sf.col('.'.join(f'`{c}`' for c in field_names))
if '.' in field_to_drop:
root, subfield = field_to_drop.split('.', maxsplit=1)
field_to_drop_from = next(f for f in schema.fields if f.name == root)
return sf.struct(
*[src_col(parents + [f.name]) for f in schema.fields if f.name != root],
_drop_nested_field(
schema=field_to_drop_from.dataType,
field_to_drop=subfield,
parents=parents + [root]
).alias(root)
)
else:
# select all columns except the one to drop
return sf.struct(
*[src_col(parents + [f.name])for f in schema.fields if f.name != field_to_drop],
)
def drop_nested_field(
df: DataFrame,
field_to_drop: str,
) -> DataFrame:
if '.' in field_to_drop:
root, subfield = field_to_drop.split('.', maxsplit=1)
field_to_drop_from = next(f for f in df.schema.fields if f.name == root)
return df.withColumn(root, _drop_nested_field(
schema=field_to_drop_from.dataType,
field_to_drop=subfield,
parents=[root]
))
else:
return df.drop(field_to_drop)
df = drop_nested_field(df, 'a.b.c.d')
回答by Suryakant Pandey
Adding the java version Solution for this.
为此添加java版本解决方案。
Utility Class(Pass your dataset and the nested column which has to be dropped to dropNestedColumn function).
实用程序类(将您的数据集和嵌套列传递给 dropNestedColumn 函数)。
(There are few bugs in Lior Chaga's answer, I have corrected them while I tried to use his answer).
(Lior Chaga 的回答中几乎没有错误,我在尝试使用他的回答时已更正了它们)。
public class NestedColumnActions {
/*
dataset : dataset in which we want to drop columns
columnName : nested column that needs to be deleted
*/
public static Dataset<?> dropNestedColumn(Dataset<?> dataset, String columnName) {
//Special case of top level column deletion
if(!columnName.contains("."))
return dataset.drop(columnName);
final DataSetModifier dataFrameFolder = new DataSetModifier(dataset);
Arrays.stream(dataset.schema().fields())
.flatMap(f -> {
//If the column name to be deleted starts with current top level column
if (columnName.startsWith(f.name() + DOT)) {
//Get new column structure under f , expected after deleting the required column
final Optional<Column> column = dropSubColumn(functions.col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
})
//Call accept function with Tuples of (top level column name, new column structure under it)
.forEach(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDataset();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + DOT)) {
column = Optional.of(functions.struct(getColumns(col, (StructType) colType, fullColumnName, dropColumnName)));
}
else {
column = Optional.of(col);
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataSetModifier implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<?> df;
public DataSetModifier(Dataset<?> df) {
this.df = df;
}
public Dataset<?> getDataset() {
return df;
}
/*
colTuple[0]:top level column name
colTuple[1]:new column structure under it
*/
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
}
}
回答by fqaiser94
The Make Structs Easy* library makes it easy to perform operations like adding, dropping, and renaming fields inside nested data structures. The library is available in both Scala and Python.
该令结构简单*库可以很容易地执行添加,删除和重命名嵌套数据结构里面的字段操作。该库在 Scala 和 Python 中均可用。
Assuming you have the following data:
假设您有以下数据:
import org.apache.spark.sql.functions._
case class Features(feat1: String, feat2: String, feat3: String)
case class Record(features: Features, arrayOfFeatures: Seq[Features])
val df = Seq(
Record(Features("hello", "world", "!"), Seq(Features("red", "orange", "yellow"), Features("green", "blue", "indigo")))
).toDF
df.printSchema
// root
// |-- features: struct (nullable = true)
// | |-- feat1: string (nullable = true)
// | |-- feat2: string (nullable = true)
// | |-- feat3: string (nullable = true)
// |-- arrayOfFeatures: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- feat1: string (nullable = true)
// | | |-- feat2: string (nullable = true)
// | | |-- feat3: string (nullable = true)
df.show(false)
// +-----------------+----------------------------------------------+
// |features |arrayOfFeatures |
// +-----------------+----------------------------------------------+
// |[hello, world, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +-----------------+----------------------------------------------+
Then dropping feat2from featuresis as simple as:
然后feat2从以下features位置删除非常简单:
import com.github.fqaiser94.mse.methods._
// drop feat2 from features
df.withColumn("features", $"features".dropFields("feat2")).show(false)
// +----------+----------------------------------------------+
// |features |arrayOfFeatures |
// +----------+----------------------------------------------+
// |[hello, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +----------+----------------------------------------------+
I noticed there were a lot of follow-up comments on other solutions asking if there's a way to drop a Column nested inside a struct nested inside of an array. This can be done by combining the functions provided by the Make Structs Easylibrary with the functions provided by spark-hofslibrary, as follows:
我注意到有很多关于其他解决方案的后续评论,询问是否有办法删除嵌套在嵌套在数组内的结构内的列。这可以通过将Make Structs Easy库提供的功能与spark-hofs库提供的功能结合来完成,如下所示:
import za.co.absa.spark.hofs._
// drop feat2 in each element of arrayOfFeatures
df.withColumn("arrayOfFeatures", transform($"arrayOfFeatures", features => features.dropFields("feat2"))).show(false)
// +-----------------+--------------------------------+
// |features |arrayOfFeatures |
// +-----------------+--------------------------------+
// |[hello, world, !]|[[red, yellow], [green, indigo]]|
// +-----------------+--------------------------------+
*Full disclosure: I am the author of the Make Structs Easylibrary that is referenced in this answer.
*完全披露:我是此答案中引用的Make Structs Easy库的作者。
回答by Jeff Evans
I will expand upon mmendez.semantic's answer here, and accounting for the issues described in the sub-thread.
我将在这里扩展 mmendez.semantic 的回答,并说明子线程中描述的问题。
def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
// we are potentially dropping a column from within a struct, that is itself inside an array
// Spark has some very strange behavior in this case, which they insist is not a bug
// see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
// and also the thread here: https://stackoverflow.com/a/39943812/375670
// this is a workaround for that behavior
// first, get all struct fields
val innerFields = innerType.fields
// next, create a new type for all the struct fields EXCEPT the column that is to be dropped
// we will need this later
val preserveNamesStruct = ArrayType(StructType(
innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
))
// next, apply dropSubColumn recursively to build up the new values after dropping the column
val filteredInnerFields = innerFields.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
}
)
// finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
// struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
// to get the original names back
Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
}
case _ => Some(col)
}
} else {
Some(col)
}
}
def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
}).foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
Usage in spark-shell:
用途spark-shell:
// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// now you can paste the function definitions
// create a deeply nested and complex JSON structure
val jsonData = """{
"foo": "bar",
"top": {
"child1": 5,
"child2": [
{
"child2First": "one",
"child2Second": 2,
"child2Third": -19.51
}
],
"child3": ["foo", "bar", "baz"],
"child4": [
{
"child2First": "two",
"child2Second": 3,
"child2Third": 16.78
}
]
}
}"""
// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())
// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")
modifiedDf.printSchema
root
|-- foo: string (nullable = true)
|-- top: struct (nullable = false)
| |-- child1: long (nullable = true)
| |-- child2: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
| |-- child3: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- child4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2First: string (nullable = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+

