scala 如何从DataFrame获取最后一行?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45406762/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:23:17  来源:igfitidea点击:

How to get the last row from DataFrame?

scalaapache-sparkapache-spark-sqlspark-dataframe

提问by mentongwu

I hava a DataFrame,the DataFrame hava two column 'value' and 'timestamp',,the 'timestmp' is ordered,I want to get the last row of the DataFrame,what should I do?

我有一个DataFrame,DataFrame有两列'value'和'timestamp','timestmp'是有序的,我想得到DataFrame的最后一行,我该怎么办?

this is my input:

这是我的输入:

+-----+---------+
|value|timestamp|
+-----+---------+
|    1|        1|
|    4|        2|
|    3|        3|
|    2|        4|
|    5|        5|
|    7|        6|
|    3|        7|
|    5|        8|
|    4|        9|
|   18|       10|
+-----+---------+

this is my code:

这是我的代码:

    val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10))
    var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp")

this is my expected result:

这是我的预期结果:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

采纳答案by Alper t. Turker

I'd simply reduce:

我只想reduce

df.reduce { (x, y) => 
  if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y 
}

回答by Mimii Cheng

Try this, it works for me.

试试这个,它对我有用。

df.orderBy($"value".desc).show(1)

回答by Danylo Zherebetskyy

I would use simply the query that - orders your table by descending order - takes 1st value from this order

我会简单地使用查询 - 按降序对您的表进行排序 - 从该订单中获取第一个值

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

回答by Raphael Roth

The most efficient way is to reduceyour DataFrame. This gives you a single row which you can convert back to a DataFrame, but as it contains only 1 record, this does not make much sense.

最有效的方法是reduce你的 DataFrame。这为您提供了一行,您可以将其转换回 DataFrame,但由于它仅包含 1 条记录,因此这没有多大意义。

sparkContext.parallelize(
  Seq(
  df.reduce {
    (a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b 
   } match {case Row(value:Int,timestamp:Int) => (value,timestamp)}
  )
)
.toDF("value","timestamp")
.show


+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

Less efficient (as it needs shuffling) although shorter is this solution:

虽然这个解决方案更短,但效率较低(因为它需要改组):

df
.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head)

回答by ktheitroadalo

If your timestamp column is unique and is in increasing order then there are following ways to get the last row

如果您的时间戳列是唯一的并且按递增顺序排列,则可以通过以下方式获取最后一行

println(df.sort($"timestamp", $"timestamp".desc).first())

// Output [1,1]

df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println)

// Output [1,1]

df.where($"timestamp" === df.count()).show

Output:

输出:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

If not create a new column with the index and select the last index as below

如果没有使用索引创建一个新列并选择最后一个索引,如下所示

val df1 = spark.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map {
  case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
StructType(df.schema.fields :+ StructField("index", LongType, false)))

df1.where($"timestamp" === df.count()).drop("index").show

Output:

输出:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

回答by Suneel

Java:

爪哇:

Dataset<Row> sortDF = inputDF.orderBy(org.apache.spark.sql.functions.col(config.getIncrementingColumn()).desc());
Row row = sortDF.first()

回答by Saurav Sahu

You can also use this function desc: Column desc(String columnName)

您还可以使用此函数descColumn desc(String columnName)

df.orderBy(desc("value")).show(1)

which gives same result as

这给出了相同的结果

df.orderBy($"value".desc).show(1)