Python 如何选择最后一行以及如何通过索引访问 PySpark 数据帧?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39544796/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to select last row and also how to access PySpark dataframe by index?
提问by Satya
From a PySpark SQL dataframe like
来自 PySpark SQL 数据框,如
name age city
abc 20 A
def 30 B
How to get the last row.(Like by df.limit(1) I can get first row of dataframe into new dataframe).
如何获得最后一行。(就像通过 df.limit(1) 我可以将第一行数据帧放入新数据帧中)。
And how can I access the dataframe rows by index.like row no. 12 or 200 .
以及如何通过 index.like 行号访问数据帧行。12 或 200 。
In pandas I can do
在熊猫我可以做
df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]
I am just curious how to access pyspark dataframe in such ways or alternative ways.
我只是好奇如何以这种方式或替代方式访问 pyspark 数据框。
Thanks
谢谢
采纳答案by zero323
How to get the last row.
如何获得最后一行。
Long and ugly way which assumes that all columns are oderable:
漫长而丑陋的方式,假设所有列都是可排序的:
from pyspark.sql.functions import (
col, max as max_, struct, monotonically_increasing_id
)
last_row = (df
.withColumn("_id", monotonically_increasing_id())
.select(max(struct("_id", *df.columns))
.alias("tmp")).select(col("tmp.*"))
.drop("_id"))
If not all columns can be order you can try:
如果不是所有列都可以订购,您可以尝试:
with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]
with_id.where(col("_id") == i).drop("_id")
Note. There is last
function in pyspark.sql.functions
/ `o.a.s.sql.functions but considering description of the corresponding expressionsit is not a good choice here.
笔记。/`oassql.functions 中有last
函数,pyspark.sql.functions
但考虑到相应表达式的描述,这里不是一个好的选择。
how can I access the dataframe rows by index.like
如何通过 index.like 访问数据帧行
You cannot. Spark DataFrame
and accessible by index. You can add indices using zipWithIndex
and filter later. Just keep in mind this O(N)operation.
你不能。SparkDataFrame
并可通过索引访问。您可以zipWithIndex
稍后使用和过滤来添加索引。请记住这个O(N)操作。
回答by Danylo Zherebetskyy
How to get the last row.
如何获得最后一行。
If you have a column that you can use to order dataframe, for example "index", then one easy way to get the last record is using SQL: 1) order your table by descending order and 2) take 1st value from this order
如果您有一列可用于对数据框进行排序,例如“索引”,那么获取最后一条记录的一种简单方法是使用 SQL:1) 按降序对表进行排序 2) 从该顺序中获取第一个值
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
And how can I access the dataframe rows by index.like row no. 12 or 200 .
以及如何通过 index.like 行号访问数据帧行。12 或 200 。
Similar way you can get record in any line
类似的方式您可以在任何行中获取记录
row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()
If you do not have "index" column you can create it using
如果您没有“索引”列,则可以使用创建它
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("index", monotonically_increasing_id())
回答by Henrique Florêncio
from pyspark.sql import functions as F
expr = [F.last(col).alias(col) for col in df.columns]
df.agg(*expr)
Just a tip:Looks like you still have the mindset of someone who is working with pandas or R. Spark is a different paradigm in the way we work with data. You don't access data inside individual cells anymore, now you work with whole chunks of it. If you keep collecting stuff and doing actions, like you just did, you lose the whole concept of parallelism that spark provide. Take a look on the concept of transformations vs actions in Spark.
只是一个提示:看起来您仍然具有使用 Pandas 或 R 的人的心态。Spark 是我们处理数据方式的不同范例。您不再访问单个单元格内的数据,现在您可以使用它的整个块。如果你像刚才那样继续收集东西并做动作,你就会失去 spark 提供的并行性的整个概念。查看 Spark 中转换与操作的概念。
回答by Clay
Use the following to get a index column that contains monotonically increasing, unique, andconsecutive integers, which is nothow monotonically_increasing_id()
work. The indexes will be ascending in the same order as colName
of your DataFrame.
使用以下方法获取包含单调递增、唯一和连续整数的索引列,这不是如何monotonically_increasing_id()
工作的。索引将按照与colName
DataFrame相同的顺序升序。
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df\
.withColumn('int', F.lit(1))\
.withColumn('index', F.sum('int').over(window))\
.drop('int')\
Use the following code to look at the tail, or last rownums
of the DataFrame.
使用以下代码查看rownums
DataFrame的尾部或最后一个。
rownums = 10
df.where(F.col('index')>df.count()-rownums).show()
Use the following code to look at the rows from start_row
to end_row
the DataFrame.
使用以下代码查看从start_row
到end_row
DataFrame的行。
start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()
zipWithIndex()
is an RDD method that does return monotonically increasing, unique, and consecutive integers, but appears to be much slower to implement in a way where you can get back to your original DataFrame amended with an id column.
zipWithIndex()
是一种 RDD 方法,它确实返回单调递增的、唯一的和连续的整数,但似乎以一种可以返回到使用 id 列修改的原始 DataFrame 的方式实现要慢得多。