scala 如何计算spark中DataFrame中列的百分比?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37668343/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:21:16  来源:igfitidea点击:

How to calculate Percentile of column in a DataFrame in spark?

scalaapache-sparkapache-spark-sqlspark-dataframe

提问by dheee

I am trying to calculate percentile of a column in a DataFrame? I cant find any percentile_approx function in Spark aggregation functions.

我正在尝试计算 DataFrame 中列的百分位数?我在 Spark 聚合函数中找不到任何 percentile_approx 函数。

For e.g. in Hive we have percentile_approx and we can use it in the following way

例如,在 Hive 中,我们有 percentile_approx,我们可以通过以下方式使用它

hiveContext.sql("select percentile_approx("Open_Rate",0.10) from myTable); 

But I want to do it using Spark DataFrame for performance reasons.

但出于性能原因,我想使用 Spark DataFrame 来做到这一点。

Sample data set

样本数据集

|User ID|Open_Rate|
------------------- 
|A1     |10.3     |
|B1     |4.04     |
|C1     |21.7     |
|D1     |18.6     |

I want to find out how many users fall into 10 percentile or 20 percentile and so on. I want to do something like this

我想知道有多少用户落入 10% 或 20% 等等。我想做这样的事情

df.select($"id",Percentile($"Open_Rate",0.1)).show

回答by Yulin GUO

Since Spark2.0, things are getting easier,simply use this function in DataFrameStatFunctions like :

从 Spark2.0 开始,事情变得更容易了,只需在 DataFrameStatFunctions 中使用此函数,例如:

df.stat.approxQuantile("Open_Rate",Array(0.25,0.50,0.75),0.0)

df.stat.approxQuantile("Open_Rate",Array(0.25,0.50,0.75),0.0)

There are also some useful statistic functions for DataFrame in DataFrameStatFunctions.

DataFrameStatFunctions中还有一些有用的DataFrame统计函数。

回答by Sim

SparkSQL and the Scala dataframe/dataset APIs are executed by the same engine. Equivalent operations will generate equivalent execution plans. You can see the execution plans with explain.

SparkSQL 和 Scala 数据帧/数据集 API 由同一个引擎执行。等效操作将生成等效执行计划。您可以查看执行计划explain

sql(...).explain
df.explain

When it comes to your specific question, it is a common pattern to intermix SparkSQL and Scala DSL syntax because, as you have discovered, their capabilities are not yet equivalent. (Another example is the difference between SQL's explode()and DSL's explode(), the latter being more powerful but also more inefficient due to marshalling.)

当涉及到您的具体问题时,混合 SparkSQL 和 Scala DSL 语法是一种常见模式,因为正如您所发现的,它们的功能尚不相同。(另一个例子是 SQLexplode()和 DSL之间的区别explode(),后者更强大,但由于编组而效率更低。)

The simple way to do it is as follows:

简单的方法如下:

df.registerTempTable("tmp_tbl")
val newDF = sql(/* do something with tmp_tbl */)
// Continue using newDF with Scala DSL

What you need to keep in mind if you go with the simple way is that temporary table names are cluster-global (up to 1.6.x). Therefore, you should use randomized table names if the code may run simultaneously more than once on the same cluster.

如果您采用简单的方法,您需要记住的是临时表名称是集群全局的(最高 1.6.x)。因此,如果代码可能在同一个集群上同时运行多次,您应该使用随机表名。

On my team the pattern is common-enough that we have added a .sql()implicit to DataFramewhich automatically registers and then unregisters a temp table for the scope of the SQL statement.

在我的团队中,这种模式很常见——我们已经添加了一个.sql()隐式,DataFrame它会自动注册然后取消注册 SQL 语句范围的临时表。