Java 如何解决 AnalysisException:Spark 中已解析的属性
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45713290/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to resolve the AnalysisException: resolved attribute(s) in Spark
提问by Rajita
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")
The join operation works fine but when I reuse the df2 I am facing unresolved attributes error
连接操作工作正常,但是当我重用 df2 时,我面临未解决的属性错误
val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")
ERROR: org.apache.spark.sql.AnalysisException: resolved attribute(s)id#426
错误:org.apache.spark.sql.AnalysisException:已解析的属性id#426
采纳答案by Erik Schmiegelow
As mentioned in my comment, it is related to https://issues.apache.org/jira/browse/SPARK-10925and, more specifically https://issues.apache.org/jira/browse/SPARK-14948. Reuse of the reference will create ambiguity in naming, so you will have to clone the df - see the last comment in https://issues.apache.org/jira/browse/SPARK-14948for an example.
正如我在评论中提到的,它与https://issues.apache.org/jira/browse/SPARK-10925,更具体地说是https://issues.apache.org/jira/browse/SPARK-14948 相关。重用引用会在命名时产生歧义,因此您必须克隆 df - 例如,请参阅https://issues.apache.org/jira/browse/SPARK-14948 中的最后一条评论。
回答by Abdennacer Lachiheb
For java developpers, try to call this method:
对于java开发者,尝试调用这个方法:
private static Dataset<Row> cloneDataset(Dataset<Row> ds) {
List<Column> filterColumns = new ArrayList<>();
List<String> filterColumnsNames = new ArrayList<>();
scala.collection.Iterator<StructField> it = ds.exprEnc().schema().toIterator();
while (it.hasNext()) {
String columnName = it.next().name();
filterColumns.add(ds.col(columnName));
filterColumnsNames.add(columnName);
}
ds = ds.select(JavaConversions.asScalaBuffer(filterColumns).seq()).toDF(scala.collection.JavaConverters.asScalaIteratorConverter(filterColumnsNames.iterator()).asScala().toSeq());
return ds;
}
on both datasets just before the joining, it clone the datasets into new ones:
在加入之前的两个数据集上,它会将数据集克隆为新的数据集:
df1 = cloneDataset(df1);
df2 = cloneDataset(df2);
Dataset<Row> join = df1.join(df2, col("column_name"));
// if it didn't work try this
final Dataset<Row> join = cloneDataset(df1.join(df2, columns_seq));
回答by Iraj Hedayati
I got the same issue when trying to use one DataFrame in two consecutive joins.
尝试在两个连续连接中使用一个 DataFrame 时遇到了同样的问题。
Here is the problem: DataFrame A has 2 columns (let's call them x and y) and DataFrame B has 2 columns as well (let's call them w and z). I need to join A with B on x=z and then join them together on y=z.
问题是:DataFrame A 有 2 列(我们称它们为 x 和 y),DataFrame B 也有 2 列(我们称它们为 w 和 z)。我需要在 x=z 上将 A 与 B 连接起来,然后在 y=z 上将它们连接在一起。
(A join B on A.x=B.z) as C join B on C.y=B.z
I was getting the exact error that in the second join it was complaining "resolved attribute(s) B.z#1234 ...".
我得到的确切错误是在第二次加入时它抱怨“已解决属性 Bz#1234 ...”。
Following the links @Erik provided and some other blogs and questions, I gathered I need a clone of B.
按照@Erik 提供的链接以及其他一些博客和问题,我发现我需要一个 B 的克隆。
Here is what I did:
这是我所做的:
val aDF = ...
val bDF = ...
val bCloned = spark.createDataFrame(bDF.rdd, bDF.schema)
aDF.join(bDF, aDF("x") === bDF("z")).join(bCloned, aDF("y") === bCloned("z"))
回答by Tomer Ben David
If you have df1, and df2 derived from df1, try renaming all columns in df2 such that no two columns have identical name after join. So before the join:
如果您有 df1 和从 df1 派生的 df2,请尝试重命名 df2 中的所有列,以便在连接后没有两列具有相同的名称。所以在加入之前:
so instead of df1.join(df2...
所以而不是 df1.join(df2...
do
做
# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')
# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)
回答by dharani sugumar
It will work if you do the below.
如果您执行以下操作,它将起作用。
suppose you have a dataframe. df1 and if you want to cross join the same dataframe, you can use the below
假设您有一个数据框。df1,如果你想交叉加入同一个数据帧,你可以使用下面的
df1.toDF("ColA","ColB").as("f_df").join(df1.toDF("ColA","ColB").as("t_df"),
$"f_df.pcmdty_id" ===
$"t_df.assctd_pcmdty_id").select($"f_df.pcmdty_id",$"f_df.assctd_pcmdty_id")
回答by Zhenyi Lin
From my experience, we have 2 solutions 1) clone DF 2) rename columns that have ambiguity before joining tables. (don't forget to drop duplicated join key)
根据我的经验,我们有 2 个解决方案 1) 克隆 DF 2) 在加入表之前重命名有歧义的列。(不要忘记删除重复的连接键)
Personally I prefer the second method, because cloning DF in the first method takes time, especially if data size is big.
我个人更喜欢第二种方法,因为在第一种方法中克隆 DF 需要时间,尤其是在数据量很大的情况下。
回答by Sanni Heruwala
In my case this error appeared during self join of same table. I was facing the below issue with Spark SQL and not the dataframe API:
在我的情况下,此错误出现在同一个表的自连接期间。我在 Spark SQL 而不是数据帧 API 方面面临以下问题:
org.apache.spark.sql.AnalysisException: Resolved attribute(s) originator#3084,program_duration#3086,originator_locale#3085 missing from program_duration#1525,guid#400,originator_locale#1524,EFFECTIVE_DATETIME_UTC#3157L,device_timezone#2366,content_rpd_id#734L,originator_sublocale#2355,program_air_datetime_utc#3155L,originator#1523,master_campaign#735,device_provider_id#2352 in operator !Deduplicate [guid#400, program_duration#3086, device_timezone#2366, originator_locale#3085, originator_sublocale#2355, master_campaign#735, EFFECTIVE_DATETIME_UTC#3157L, device_provider_id#2352, originator#3084, program_air_datetime_utc#3155L, content_rpd_id#734L]. Attribute(s) with the same name appear in the operation: originator,program_duration,originator_locale. Please check if the right attribute(s) are used.;;
Earlier I was using below query,
早些时候我使用以下查询,
SELECT * FROM DataTable as aext
INNER JOIN AnotherDataTable LAO
ON aext.device_provider_id = LAO.device_provider_id
Selecting only required columns before joining solved the issue for me.
在加入之前只选择必需的列为我解决了这个问题。
SELECT * FROM (
select distinct EFFECTIVE_DATE,system,mso_Name,EFFECTIVE_DATETIME_UTC,content_rpd_id,device_provider_id
from DataTable
) as aext
INNER JOIN AnotherDataTable LAO ON aext.device_provider_id = LAO.device_provider_id
回答by Jeevan
[TLDR]
[TLDR]
Break the AttributeReferenceshared between columns in parent DataFrame and derived DataFrame by writing the intermediate DataFrame to file system and reading it again.
通过将中间 DataFrame 写入文件系统并再次读取,打破父 DataFrame 和派生 DataFrame 中列之间共享的AttributeReference。
Ex:
前任:
val df1 = spark.read.parquet("file1")
df1.createOrReplaceTempView("df1")
val df2 = spark.read.parquet("file2")
df2.createOrReplaceTempView("df2")
val df12 = spark.sql("""SELECT * FROM df1 as d1 JOIN df2 as d2 ON d1.a = d2.b""")
df12.createOrReplaceTempView("df12")
val df12_ = spark.sql(""" -- some transformation -- """)
df12_.createOrReplaceTempView("df12_")
val df3 = spark.read.parquet("file3")
df3.createOrReplaceTempView("df3")
val df123 = spark.sql("""SELECT * FROM df12_ as d12_ JOIN df3 as d3 ON d12_.a = d3.c""")
df123.createOrReplaceTempView("df123")
Now joining with top level DataFrame will lead to "unresolved attribute error"
现在加入顶级 DataFrame 将导致“未解决的属性错误”
val df1231 = spark.sql("""SELECT * FROM df123 as d123 JOIN df1 as d1 ON d123.a = d1.a""")
Solution: d123.a and d1.a share same AttributeReference break it by writing intermediate table df123 to file system and reading again. now df123write.a and d1.a does not share AttributeReference
解决方案:d123.a 和 d1.a 共享相同的 AttributeReference 通过将中间表 df123 写入文件系统并再次读取来破坏它。现在 df123write.a 和 d1.a 不共享 AttributeReference
val df123 = spark.sql("""SELECT * FROM df12 as d12 JOIN df3 as d3 ON d12.a = d3.c""")
df123.createOrReplaceTempView("df123")
df123.write.parquet("df123.par")
val df123write = spark.read.parquet("df123.par")
spark.catalog.dropTempView("df123")
df123write.createOrReplaceTempView("df123")
val df1231 = spark.sql("""SELECT * FROM df123 as d123 JOIN df1 as d1 ON d123.a = d1.a""")
Long story:
很长的故事:
We had complex ETLs with transformation and self joins of DataFrames, performed at multiple levels. We faced "unresolved attribute" error frequently and we solved it by selecting required attribute and performing join on the top level table instead of directly joining with the top level table this solved the issue temporarily but when we applied some more transformation on these DataFrame and joined with any top level DataFrames, "unresolved attribute" error raised its ugly head again.
我们有复杂的 ETL,带有 DataFrame 的转换和自连接,在多个级别执行。我们经常遇到“未解析的属性”错误,我们通过选择所需的属性并在顶级表上执行连接来解决它,而不是直接与顶级表连接,这暂时解决了问题,但是当我们对这些 DataFrame 应用更多转换并加入时对于任何顶级数据帧,“未解析的属性”错误再次抬起了它丑陋的头。
This was happening because DataFrames in bottom level were sharing the same AttributeReference with top level DataFrames from which they were derived [more details]
发生这种情况是因为底层的 DataFrame 与派生它们的顶层 DataFrame 共享相同的 AttributeReference [更多详细信息]
So we broke this reference sharing by writing just 1 intermediate transformed DataFrame and reading it again and continuing with our ETL. This broke sharing AttributeReference between bottom DataFrames and Top DataFrames and we never again faced "unresolved attribute" error.
因此,我们通过仅写入 1 个中间转换后的 DataFrame 并再次读取并继续我们的 ETL 来打破这种引用共享。这破坏了底部数据帧和顶部数据帧之间的共享 AttributeReference,我们再也没有遇到“未解析的属性”错误。
This worked for us because as we moved from top level DataFrame to bottom performing transformation and join our data shrank than initial DataFrames that we started, it also improved our performance as data size was less and spark didn't have to traverse back the DAG all the way to the last persisted DataFrame.
这对我们有用,因为当我们从顶层 DataFrame 转移到底层执行转换并加入我们的数据时,我们的数据比我们开始的初始 DataFrame 缩小了,它也提高了我们的性能,因为数据大小更小,而且 spark 不必全部遍历 DAG最后一个持久化数据帧的方式。
回答by mhmdburton
just rename your columns and put the same name. in pyspark: for i in df.columns: df = df.withColumnRenamed(i,i)
只需重命名您的列并输入相同的名称。在 pyspark 中:对于 df.columns 中的 i:df = df.withColumnRenamed(i,i)
回答by Jason CHAN
This issue really killed lots of my time and I finally got an easy solution for it: In PySpark, for the problematic column, say colA
, We can simply use df = df.sellect(F.col("colA").alias("colA"))
before using df in the join
. here F is import pyspark.sql.functions as F
. I think it should work for scala/java Spark too.
这个问题真的浪费了我很多时间,我终于找到了一个简单的解决方案:在 PySpark 中,对于有问题的列,比如说colA
,我们可以df = df.sellect(F.col("colA").alias("colA"))
在join
. 这里 F 是import pyspark.sql.functions as F
。我认为它也应该适用于 scala/java Spark。