scala 使用 Spark DataFrame 获取列上的不同值

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38946337/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:33:11  来源:igfitidea点击:

Fetching distinct values on a column using Spark DataFrame

scalaapache-sparkdataframeapache-spark-sqlspark-dataframe

提问by Kazhiyur

Using Spark 1.6.1 version I need to fetch distinct values on a column and then perform some specific transformation on top of it. The column contains more than 50 million records and can grow larger.
I understand that doing a distinct.collect()will bring the call back to the driver program. Currently I am performing this task as below, is there a better approach?

使用 Spark 1.6.1 版本我需要在列上获取不同的值,然后在它之上执行一些特定的转换。该列包含超过 5000 万条记录,并且可以变得更大。
我知道执行 adistinct.collect()会将调用带回驱动程序。目前我正在执行以下任务,有没有更好的方法?

 import sqlContext.implicits._
 preProcessedData.persist(StorageLevel.MEMORY_AND_DISK_2)

 preProcessedData.select(ApplicationId).distinct.collect().foreach(x => {
   val applicationId = x.getAs[String](ApplicationId)
   val selectedApplicationData = preProcessedData.filter($"$ApplicationId" === applicationId)
   // DO SOME TASK PER applicationId
 })

 preProcessedData.unpersist()  

回答by Alberto Bonsanto

Well to obtain all different values in a Dataframeyou can use distinct. As you can see in the documentation that method returns another DataFrame. After that you can create a UDFin order to transformeach record.

要在 a 中获取所有不同的值,Dataframe您可以使用distinct。正如您在文档中所见,该方法返回另一个DataFrame. 之后,您可以创建一个UDF转换每个记录。

For example:

例如:

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("age", "salary")

// I obtain all different values. If you show you must see only {1, 3}
val distinctValuesDF = df.select(df("age")).distinct

// Define your udf. In this case I defined a simple function, but they can get complicated.
val myTransformationUDF = udf(value => value / 10)

// Run that transformation "over" your DataFrame
val afterTransformationDF = distinctValuesDF.select(myTransformationUDF(col("age")))