Python 如何选择最后一行以及如何通过索引访问 PySpark 数据帧?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39544796/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 22:26:24  来源:igfitidea点击:

How to select last row and also how to access PySpark dataframe by index?

pythonapache-sparkpysparkapache-spark-sqlpyspark-sql

提问by Satya

From a PySpark SQL dataframe like

来自 PySpark SQL 数据框,如

name age city
abc   20  A
def   30  B

How to get the last row.(Like by df.limit(1) I can get first row of dataframe into new dataframe).

如何获得最后一行。(就像通过 df.limit(1) 我可以将第一行数据帧放入新数据帧中)。

And how can I access the dataframe rows by index.like row no. 12 or 200 .

以及如何通过 index.like 行号访问数据帧行。12 或 200 。

In pandas I can do

在熊猫我可以做

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]

I am just curious how to access pyspark dataframe in such ways or alternative ways.

我只是好奇如何以这种方式或替代方式访问 pyspark 数据框。

Thanks

谢谢

采纳答案by zero323

How to get the last row.

如何获得最后一行。

Long and ugly way which assumes that all columns are oderable:

漫长而丑陋的方式,假设所有列都是可排序的:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))

If not all columns can be order you can try:

如果不是所有列都可以订购,您可以尝试:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")

Note. There is lastfunction in pyspark.sql.functions/ `o.a.s.sql.functions but considering description of the corresponding expressionsit is not a good choice here.

笔记。/`oassql.functions 中有last函数,pyspark.sql.functions但考虑到相应表达式的描述,这里不是一个好的选择。

how can I access the dataframe rows by index.like

如何通过 index.like 访问数据帧行

You cannot. Spark DataFrameand accessible by index. You can add indices using zipWithIndexand filter later. Just keep in mind this O(N)operation.

你不能。SparkDataFrame并可通过索引访问。您可以zipWithIndex稍后使用和过滤来添加索引。请记住这个O(N)操作。

回答by Danylo Zherebetskyy

How to get the last row.

如何获得最后一行。

If you have a column that you can use to order dataframe, for example "index", then one easy way to get the last record is using SQL: 1) order your table by descending order and 2) take 1st value from this order

如果您有一列可用于对数据框进行排序,例如“索引”,那么获取最后一条记录的一种简单方法是使用 SQL:1) 按降序对表进行排序 2) 从该顺序中获取第一个值

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

And how can I access the dataframe rows by index.like row no. 12 or 200 .

以及如何通过 index.like 行号访问数据帧行。12 或 200 。

Similar way you can get record in any line

类似的方式您可以在任何行中获取记录

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()

If you do not have "index" column you can create it using

如果您没有“索引”列,则可以使用创建它

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())

回答by Henrique Florêncio

from pyspark.sql import functions as F

expr = [F.last(col).alias(col) for col in df.columns]

df.agg(*expr)

Just a tip:Looks like you still have the mindset of someone who is working with pandas or R. Spark is a different paradigm in the way we work with data. You don't access data inside individual cells anymore, now you work with whole chunks of it. If you keep collecting stuff and doing actions, like you just did, you lose the whole concept of parallelism that spark provide. Take a look on the concept of transformations vs actions in Spark.

只是一个提示:看起来您仍然具有使用 Pandas 或 R 的人的心态。Spark 是我们处理数据方式的不同范例。您不再访问单个单元格内的数据,现在您可以使用它的整个块。如果你像刚才那样继续收集东西并做动作,你就会失去 spark 提供的并行性的整个概念。查看 Spark 中转换与操作的概念。

回答by Clay

Use the following to get a index column that contains monotonically increasing, unique, andconsecutive integers, which is nothow monotonically_increasing_id()work. The indexes will be ascending in the same order as colNameof your DataFrame.

使用以下方法获取包含单调递增、唯一连续整数的索引列,这不是如何monotonically_increasing_id()工作的。索引将按照与colNameDataFrame相同的顺序升序。

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)

df = df\
 .withColumn('int', F.lit(1))\
 .withColumn('index', F.sum('int').over(window))\
 .drop('int')\

Use the following code to look at the tail, or last rownumsof the DataFrame.

使用以下代码查看rownumsDataFrame的尾部或最后一个。

rownums = 10
df.where(F.col('index')>df.count()-rownums).show()

Use the following code to look at the rows from start_rowto end_rowthe DataFrame.

使用以下代码查看从start_rowend_rowDataFrame的行。

start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()

zipWithIndex()is an RDD method that does return monotonically increasing, unique, and consecutive integers, but appears to be much slower to implement in a way where you can get back to your original DataFrame amended with an id column.

zipWithIndex()是一种 RDD 方法,它确实返回单调递增的、唯一的和连续的整数,但似乎以一种可以返回到使用 id 列修改的原始 DataFrame 的方式实现要慢得多。