Java Spark Dataframe:选择不同的行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/55010434/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 01:09:51  来源:igfitidea点击:

Spark Dataframe: Select distinct rows

javasqldataframeapache-sparkapache-spark-sql

提问by Himanshu Yadav

I tried two ways to find distinct rows from parquet but it doesn't seem to work.
Attemp 1:Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
But throws

我尝试了两种方法来从镶木地板中找到不同的行,但似乎不起作用。
尝试 1:Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
但是抛出

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

Attemp 2:Tried running sql queries:

尝试 2:尝试运行 sql 查询:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

error I get:

我得到的错误:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

Is there a way to get distinct records while reading parquet files? Any read option I can use.

有没有办法在读取镶木地板文件时获得不同的记录?我可以使用的任何阅读选项。

采纳答案by user10938362

The problem you face is explicitly stated in the exception message - because MapTypecolumns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.

您面临的问题在异常消息中明确说明 - 因为MapType列既不可散列也不可排序,不能用作分组或分区表达式的一部分。

Your take on SQL solution is not logically equivalent to distincton Dataset. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates:

您对 SQL 解决方案的看法在逻辑上不等同于distincton Dataset。如果要根据一组兼容列对数据进行重复数据删除,则应使用dropDuplicates

df.dropDuplicates("timestamp")

which would be equivalent to

这相当于

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

Unfortunately if your goal is actual DISTINCTit won't be so easy. On possible solution is to leverage Scala* Maphashing. You could define Scalaudflike this:

不幸的是,如果您的目标是实际的,DISTINCT那就不会那么容易了。可能的解决方案是利用 Scala*Map散列。你可以这样定义Scalaudf

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

and then use it in your Java code to derive column that can be used to dropDuplicates:

然后在您的 Java 代码中使用它来派生可用于的列dropDuplicates

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

with SQL equivalent

与 SQL 等效

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes


* Please note that java.util.Mapwith its hashCodewon't work, as hashCodeis not consistent.

* 请注意,java.util.MaphashCode不会工作,因为hashCode不一致。

回答by Andronicus

Yes, the syntax is incorrect, it should be:

是的,语法不正确,应该是:

Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");

回答by vaquar khan

1) If you want to distinct based on coluns you can use it

1)如果你想根据coluns进行区分,你可以使用它

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")


scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

val distinctValuesDF = df.select(df("no")).distinct

scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+

2) If you have want unique on all column use dropduplicate

2)如果你想在所有列上都是唯一的,请使用 dropduplicate

scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")



scala> df.show

+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+


scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+