Python 在 Spark 中进行 DF 连接后删除重复列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/46944493/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 17:56:39  来源:igfitidea点击:

Removing duplicate columns after a DF join in Spark

pythonpyspark

提问by thecheech

When you join two DFs with similar column names:

当您加入两个具有相似列名的 DF 时:

df = df1.join(df2, df1['id'] == df2['id'])

Join works fine but you can't call the idcolumn because it is ambiguous and you would get the following exception:

Join 工作正常,但您不能调用该id列,因为它不明确,并且您会收到以下异常:

pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;"

This makes idnot usable anymore...

这使得id不再可用......

The following function solves the problem:

下面的函数解决了这个问题:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

What I don't like about it is that I have to iterate over the column names and delete them why by one. This looks really clunky...

我不喜欢它的是我必须遍历列名并将它们删除。这看起来真的很笨拙......

Do you know of any other solution that will either join and remove duplicates more elegantly or delete multiple columns without iterating over each of them?

您是否知道任何其他解决方案可以更优雅地连接和删除重复项,或者删除多个列而不遍历每个列?

回答by Psidom

If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:

如果两个数据框中的连接列具有相同的名称并且您只需要等连接,则可以将连接列指定为列表,在这种情况下,结果将仅保留连接列之一:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+


Otherwise you need to give the join data frames aliasand refer to the duplicated columns by the aliaslater:

否则,您需要为连接数据框提供别名,并稍后通过别名引用重复的列:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

回答by jerrytim

df.join(other, on, how)when onis a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns. when onis a join expression, it will result in duplicate columns. We can use .drop(df.a)to drop duplicate columns. Example:

df.join(other, on, how)on是列名字符串或列名字符串列表时,返回的数据框将防止重复列。whenon是连接表达式,它将导致重复的列。我们可以使用.drop(df.a)删除重复的列。例子:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)

回答by Heapify

Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'

假设“a”是一个带有“id”列的数据框,“b”是另一个带有“id”列的数据框

I use the following two methods to remove duplicates:

我使用以下两种方法来删除重复项:

Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you

方法 1:使用字符串连接表达式而不是布尔表达式。这会自动为您删除重复的列

a.join(b, 'id')

Method 2: Renaming the column before the join and dropping it after

方法 2:在加入之前重命名列并在加入之后删除它

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)

回答by hussam

The code below works with Spark 1.6.0 and above.

下面的代码适用于 Spark 1.6.0 及更高版本。

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+

storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

Assuming -in this example- that the name of the shared column is the same:

假设 - 在此示例中 - 共享列的名称相同:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()

+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.joinwill prevent the duplication of the shared column.

.join将防止共享列的重复。

Let's assume that you want to remove the column Numin this example, you can just use .drop('colname')

假设您要删除Num此示例中的列,您可以使用.drop('colname')

joined=joined.drop('Num')
joined.show()

+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+

回答by QA Collective

After I've joined multiple tables together, I run them through a simple function to drop columns in the DF if it encounters duplicates while walking from left to right. Alternatively, you could rename these columns too.

在我将多个表连接在一起后,我通过一个简单的函数运行它们,如果在从左到右行走时遇到重复项,则删除 DF 中的列。或者,您也可以重命名这些列

Where Namesis a table with columns ['Id', 'Name', 'DateId', 'Description']and Datesis a table with columns ['Id', 'Date', 'Description'], the columns Idand Descriptionwill be duplicated after being joined.

哪里Names是有列的表['Id', 'Name', 'DateId', 'Description']Dates是有列的表['Id', 'Date', 'Description'],列IdDescription加入后会被复制。

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where dropDupeDfColsis defined as:

其中dropDupeDfCols定义为:

def dropDupeDfCols(df):
    newcols = []
    dupcols = []

    for i in range(len(df.columns)):
        if df.columns[i] not in newcols:
            newcols.append(df.columns[i])
        else:
            dupcols.append(i)

    df = df.toDF(*[str(i) for i in range(len(df.columns))])
    for dupcol in dupcols:
        df = df.drop(str(dupcol))

    return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Date'].

生成的数据框将包含 columns ['Id', 'Name', 'DateId', 'Description', 'Date']

回答by Santosh Kumar

In my case I had a dataframe with multiple duplicate columns after joins and I was trying to same that dataframe in csv format, but due to duplicate column I was getting error. I followed below steps to drop duplicate columns. Code is in scala

在我的情况下,我在连接后有一个包含多个重复列的数据框,我试图以 csv 格式相同该数据框,但由于重复列我收到错误。我按照以下步骤删除重复的列。代码在 Scala 中

1) Rename all the duplicate columns and make new dataframe2) make separate list for all the renamed columns3) Make new dataframe with all columns (including renamed - step 1)4) drop all the renamed column

1) Rename all the duplicate columns and make new dataframe2) make separate list for all the renamed columns3) Make new dataframe with all columns (including renamed - step 1)4) drop all the renamed column

private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns:  mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))

if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}

to call the above function use below code and pass your dataframe which contains duplicate columns

to call the above function use below code and pass your dataframe which contains duplicate columns

val uniColDF = removeDuplicateColumns(df)