Python 在 Spark 数据框列中获取最大值的最佳方法
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33224740/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Best way to get the max value in a Spark dataframe column
提问by xenocyon
I'm trying to figure out the best way to get the largest value in a Spark dataframe column.
我试图找出在 Spark 数据帧列中获得最大值的最佳方法。
Consider the following example:
考虑以下示例:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Which creates:
这创造了:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
My goal is to find the largest value in column A (by inspection, this is 3.0). Using PySpark, here are four approaches I can think of:
我的目标是在 A 列中找到最大值(通过检查,这是 3.0)。使用 PySpark,我可以想到以下四种方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Each of the above gives the right answer, but in the absence of a Spark profiling tool I can't tell which is best.
以上每个都给出了正确的答案,但在没有 Spark 分析工具的情况下,我无法确定哪个是最好的。
Any ideas from either intuition or empiricism on which of the above methods is most efficient in terms of Spark runtime or resource usage, or whether there is a more direct method than the ones above?
关于以上哪种方法在 Spark 运行时或资源使用方面最有效的任何直觉或经验主义的想法,或者是否有比上述方法更直接的方法?
采纳答案by Burt
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
The answer is almost the same as method3. but seems the "asDict()" in method3 can be removed
答案和method3差不多。但似乎可以删除方法3中的“asDict()”
回答by Boern
In case some wonders how to do it using Scala (using Spark 2.0.+), here you go:
万一有人想知道如何使用 Scala(使用 Spark 2.0.+)来做到这一点,请看这里:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
回答by Rudra Prasad Samal
Max value for a particular column of a dataframe can be achieved by using -
数据帧特定列的最大值可以通过使用 -
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
回答by Danylo Zherebetskyy
Remark: Spark is intended to work on Big Data - distributed computing. The size of the example DataFrame is very small, so the order of real-life examples can be altered with respect to the small ~ example.
备注:Spark 旨在用于大数据 - 分布式计算。示例 DataFrame 的大小非常小,因此现实生活中示例的顺序可以相对于小 ~ 示例进行更改。
Slowest: Method_1, because .describe("A") calculates min, max, mean, stddev, and count (5 calculations over the whole column)
最慢:Method_1,因为 .describe("A") 计算 min、max、mean、stddev 和 count(对整列进行 5 次计算)
Medium: Method_4, because, .rdd (DF to RDD transformation) slows down the process.
中:Method_4,因为 .rdd(DF 到 RDD 的转换)会减慢进程。
Faster: Method_3 ~ Method_2 ~ method_5, because the logic is very similar, so Spark's catalyst optimizer follows very similar logic with minimal number of operations (get max of a particular column, collect a single-value dataframe); (.asDict() adds a little extra-time comparing 3,2 to 5)
更快:Method_3 ~ Method_2 ~ method_5,因为逻辑非常相似,所以Spark的催化剂优化器遵循非常相似的逻辑,操作次数最少(获取特定列的最大值,收集单值数据帧);(.asDict() 比 3,2 和 5 增加了一点额外的时间)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
Result on an edge-node of a cluster in milliseconds (ms):
以毫秒 (ms) 为单位的集群边缘节点上的结果:
small DF (ms) : {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
小DF(毫秒):{'m1':7096,'m2':205,'m3':165,'m4':211,'m5':180}
bigger DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}
更大的DF(毫秒):{'m1':10260,'m2':452,'m3':465,'m4':916,'m5':373}
回答by Vyom Shrivastava
I believe the best solution will be using head()
我相信最好的解决方案是使用 head()
Considering your example:
考虑你的例子:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Using agg and max method of python we can get the value as following :from pyspark.sql.functions import max
df.agg(max(df.A)).head()[0]
使用python的agg和max方法,我们可以得到如下值:from pyspark.sql.functions import max
df.agg(max(df.A)).head()[0]
This will return:
3.0
这将返回:
3.0
Make sure you have the correct import:from pyspark.sql.functions import max
The max function we use here is the pySPark sql library function, not the default max function of python.
确保你有正确的导入:from pyspark.sql.functions import max
我们这里使用的max函数是pySPark sql库函数,而不是python默认的max函数。
回答by luminousmen
Another way of doing it:
另一种方法:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
On my data, I got this benchmarks:
根据我的数据,我得到了以下基准:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
All of them give the same answer
他们都给出了相同的答案
回答by user 923227
Here is a lazy way of doing this, by just doing compute Statistics:
这是一种懒惰的方法,只需执行计算统计:
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
or
或者
spark.sql("Select * from sampleStats").describe('ColName')
or you can open a hive shell and
或者你可以打开一个蜂巢外壳
describe formatted table sampleStats;
You will see the statistics in the properties - min, max, distinct, nulls, etc.
您将在属性中看到统计信息 - min、max、distinct、nulls 等。
回答by hello-world
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val testDataFrame = Seq(
(1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")
val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
.as[(Double, Double)]
.first()
println(maxA, maxB)
And the result is (3.0,6.0), which is the same to the testDataFrame.agg(max($"A"), max($"B")).collect()(0)
.However, testDataFrame.agg(max($"A"), max($"B")).collect()(0)
returns a List, [3.0,6.0]
并且结果是(3.0,6.0),和 一样。testDataFrame.agg(max($"A"), max($"B")).collect()(0)
然而,testDataFrame.agg(max($"A"), max($"B")).collect()(0)
返回一个List,[3.0,6.0]
回答by Grant Shannon
in pyspark you can do this:
在 pyspark 中,您可以执行以下操作:
max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())
回答by Nandeesh
The below example shows how to get the max value in a Spark dataframe column.
下面的示例显示了如何获取 Spark 数据帧列中的最大值。
from pyspark.sql.functions import max
df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
| 3.0|
+------+
print result.collect()[0]['max(A)']
3.0
Similarly min, mean, etc. can be calculated as shown below:
类似地,可以按如下所示计算 min、mean 等:
from pyspark.sql.functions import mean, min, max
result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
| 2.0| 1.0| 3.0|
+------+------+------+