scala Spark Dataframes - 按键减少
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/41236804/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Dataframes- Reducing By Key
提问by d80tb7
Let's say I have a data structure like this where ts is some timestamp
假设我有一个这样的数据结构,其中 ts 是一些时间戳
case class Record(ts: Long, id: Int, value: Int)
Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:
给定大量这些记录,我希望以每个 id 具有最高时间戳的记录结束。使用 RDD api 我认为以下代码可以完成工作:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}
Likewise this is my attempt with datasets:
同样,这是我对数据集的尝试:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}
I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:
我一直在努力研究如何使用数据框实现类似的功能,但无济于事 - 我意识到我可以使用以下方法进行分组:
records.groupBy($"id")
But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.
但这给了我一个 RelationGroupedDataSet 并且我不清楚我需要编写什么聚合函数来实现我想要的 - 我看到的所有示例聚合似乎都专注于返回聚合的单个列而不是整行。
Is it possible to achieve this using dataframes?
是否可以使用数据框来实现这一点?
回答by Assaf Mendelson
You can use the argmax logic (see databricks example)
您可以使用 argmax 逻辑(参见databricks 示例)
For example, lets say your dataframe is called df and it has the columns id, val, ts you would do something like this:
例如,假设您的数据框名为 df 并且它具有列 id、val、ts,您可以执行以下操作:
import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*")
回答by user238607
For Datasets I did this,tested on Spark 2.1.1
对于我这样做的数据集,在 Spark 2.1.1 上测试
final case class AggregateResultModel(id: String,
                                      mtype: String,
                                      healthScore: Int,
                                      mortality: Float,
                                      reimbursement: Float)
.....
.....
// assume that the rawScores are loaded behorehand from json,csv files
val groupedResultSet = rawScores.as[AggregateResultModel].groupByKey( item => (item.id,item.mtype ))
      .reduceGroups( (x,y) => getMinHealthScore(x,y)).map(_._2)
// the binary function used in the reduceGroups
def getMinHealthScore(x : AggregateResultModel, y : AggregateResultModel): AggregateResultModel = {
    // complex logic for deciding between which row to keep
    if (x.healthScore > y.healthScore) { return y }
    else if (x.healthScore < y.healthScore) { return x }
    else {
      if (x.mortality < y.mortality) { return y }
      else if (x.mortality > y.mortality) { return x }
      else  {
        if(x.reimbursement < y.reimbursement)
          return x
        else
          return y
      }
    }
  }

![scala 接收 TimeoutException 的可能原因是什么:使用 Spark 时期货在 [n 秒] 后超时](/res/img/loading.gif)