scala 加入后如何避免重复列?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35258506/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:00:27  来源:igfitidea点击:

How to avoid duplicate columns after join?

scalaapache-sparkapache-spark-sql

提问by Neel

I have two dataframes with the following columns:

我有两个带有以下列的数据框:

df1.columns
//  Array(ts, id, X1, X2)

and

df2.columns
//  Array(ts, id, Y1, Y2)

After I do

我做了之后

val df_combined = df1.join(df2, Seq(ts,id))

I end up with the following columns: Array(ts, id, X1, X2, ts, id, Y1, Y2). I could expect that the common columns would be dropped. Is there something that additional that needs to be done?

我最终得到以下列:Array(ts, id, X1, X2, ts, id, Y1, Y2). 我可以预期公共列会被删除。有什么额外的事情需要做吗?

回答by stackoverflowuser2010

The simple answer (from the Databricks FAQ on this matter) is to perform the join where the joined columns are expressed as an array of strings(or one string) instead of a predicate.

简单的答案(来自有关此问题Databricks 常见问题解答)是执行连接,其中连接的列表示为字符串数组(或一个字符串)而不是谓词。

Below is an example adapted from the Databricks FAQ but with two join columns in order to answer the original poster's question.

以下是改编自 Databricks 常见问题解答的示例,但有两个连接列以回答原始发布者的问题。

Here is the leftdataframe:

这是左侧的数据框:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))

val left = llist.toDF("firstname","lastname","date","duration")

left.show()

/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

Here is the rightdataframe:

这是正确的数据框:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")

right.show()

/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

Here is an incorrectsolution, where the join columns are defined as the predicate left("firstname")===right("firstname") && left("lastname")===right("lastname").

这是一个不正确的解决方案,其中连接列被定义为 predicate left("firstname")===right("firstname") && left("lastname")===right("lastname")

The incorrect result is that the firstnameand lastnamecolumns are duplicated in the joined data frame:

不正确的结果是firstnamelastname列在连接的数据框中重复:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show

/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

The correctsolution is to define the join columns as an array of strings Seq("firstname", "lastname"). The output data frame does not have duplicated columns:

正确的解决方案是定义连接列作为一个字符串数组Seq("firstname", "lastname")。输出数据框没有重复的列:

left.join(right, Seq("firstname", "lastname")).show

/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/

回答by zero323

This is an expected behavior. DataFrame.joinmethod is equivalent to SQL join like this

这是预期的行为。DataFrame.join方法相当于这样的SQL join

SELECT * FROM a JOIN b ON joinExprs

If you want to ignore duplicate columns just drop them or select columns of interest afterwards. If you want to disambiguate you can use access these using parent DataFrames:

如果您想忽略重复的列,只需删除它们或之后选择感兴趣的列。如果你想消除歧义,你可以使用访问这些使用 parent DataFrames

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???

a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

or use aliases:

或使用别名:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

For equi-joins there exist a special shortcut syntax which takes either a sequence of strings:

对于 equi-joins,存在一种特殊的快捷语法,它采用字符串序列

val usingColumns: Seq[String] = ???

a.join(b, usingColumns)

or as single string

或作为单个字符串

val usingColumn: String = ???

a.join(b, usingColumn)

which keep only one copy of columns used in a join condition.

只保留一份用于连接条件的列副本。

回答by tintin

I have been stuck with this for a while, and only recently I came up with a solution what is quite easy.

我已经坚持了一段时间,直到最近我才想出了一个非常简单的解决方案。

Say a is

说一个是

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]

scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and 
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]

scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

and I can do this to select only the value in dataframe a:

我可以这样做以仅选择数据帧 a 中的值:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+

回答by Abu Shoeb

You can simply use this

你可以简单地使用这个

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

Here TYPE-OF-JOIN can be

这里 TYPE-OF-JOIN 可以是

  • left
  • right
  • inner
  • fullouter
  • 剩下
  • 完整的

For example, I have two dataframes like this:

例如,我有两个这样的数据框:

// df1
word   count1
w1     10   
w2     15  
w3     20

// df2
word   count2
w1     100   
w2     150  
w5     200

If you do fullouter join then the result looks like this

如果您执行 fullouter join,则结果如下所示

df1.join(df2, Seq("word"),"fullouter").show()

word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200

回答by Ray

try this,

试试这个,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))

回答by Thomas Decaux

This is a normal behavior from SQL, what I am doing for this:

这是 SQL 的正常行为,我为此做了什么:

  • Drop or Rename source columns
  • Do the join
  • Drop renamed column if any
  • 删除或重命名源列
  • 加入
  • 删除重命名的列(如果有)

Here I am replacing "fullname" column:

在这里,我要替换“全名”列:

Some code in Java:

Java中的一些代码:

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
    .drop("fullname")
    .registerTempTable("data_original");

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
    .registerTempTable("data_v2");

 this
    .sqlContext
    .sql(etlQuery)
    .repartition(1)
    .write()
    .mode(SaveMode.Overwrite)
    .parquet(outputPath);

Where the query is:

查询在哪里:

SELECT
    d.*,
   concat_ws('_', product_name, product_module, name) AS fullname
FROM
    {table_source} d
LEFT OUTER JOIN
    {table_updates} u ON u.id = d.id

This is something you can do only with Spark I believe (drop column from list), very very helpful!

我相信这是你只能用 Spark 做的事情(从列表中删除列),非常非常有帮助!

回答by jigyasu nayyar

Best practice is to make column name different in both the DF before joining them and drop accordingly.

最佳做法是在加入它们之前在两个 DF 中使列名不同,并相应地删除。

df1.columns =[id, age, income]
df2.column=[id, age_group]

df1.join(df2, on=df1.id== df2.id,how='inner').write.saveAsTable('table_name')

will return an error while error for duplicate columns

重复列错误时将返回错误

Try this instead try this:

试试这个,试试这个:

df1.join(df2.withColumnRenamed('id','id_2'), on=df1.id== df2.id_2,how='inner').drop('id_2')

回答by QA Collective

After I've joined multiple tables together, I run them through a simple function to rename columns in the DF if it encounters duplicates. Alternatively, you could drop these duplicate columns too.

将多个表连接在一起后,我通过一个简单的函数运行它们,以便在遇到重复项时重命名 DF 中的列。或者,您也可以删除这些重复的列

Where Namesis a table with columns ['Id', 'Name', 'DateId', 'Description']and Datesis a table with columns ['Id', 'Date', 'Description'], the columns Idand Descriptionwill be duplicated after being joined.

哪里Names是有列的表['Id', 'Name', 'DateId', 'Description']Dates是有列的表['Id', 'Date', 'Description'],列IdDescription加入后会被复制。

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where deDupeDfColsis defined as:

其中deDupeDfCols定义为:

def deDupeDfCols(df, separator=''):
    newcols = []

    for col in df.columns:
        if col not in newcols:
            newcols.append(col)
        else:
            for i in range(2, 1000):
                if (col + separator + str(i)) not in newcols:
                    newcols.append(col + separator + str(i))
                    break

    return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2'].

结果数据框将包含列['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2']

Apologies this answer is in Python - I'm not familiar with Scala, but this was the question that came up when I Googled this problem and I'm sure Scala code isn't toodifferent.

抱歉,这个答案是用 Python 写的——我不熟悉 Scala,但这是我在 Google 上搜索这个问题时出现的问题,我确信 Scala 代码并没有太大不同。

回答by Manoj Kumar Dhakad

Inner Join is default join in spark, Below is simple syntax for it.

内连接是 spark 中的默认连接,下面是它的简单语法。

leftDF.join(rightDF,"Common Col Nam")

For Other join you can follow the below syntax

对于其他连接,您可以遵循以下语法

leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")

If columns Name are not common then

如果列名称不常见,则

leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")