Python PySpark DataFrames - 枚举而不转换为 Pandas 的方法?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32760888/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
PySpark DataFrames - way to enumerate without converting to Pandas?
提问by Maria Koroliuk
I have a very big pyspark.sql.dataframe.DataFramenamed df. I need some way of enumerating records- thus, being able to access record with certain index. (or select group of records with indexes range)
我有一个很大的名为 df 的pyspark.sql.dataframe.DataFrame。我需要某种枚举记录的方法 - 因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)
In pandas, I could make just
在熊猫中,我可以只做
indexes=[2,3,6,7]
df[indexes]
Here I want something similar, (and without converting dataframe to pandas)
在这里我想要类似的东西,(并且不将数据帧转换为熊猫)
The closest I can get to is:
我能得到的最接近的是:
Enumerating all the objects in the original dataframe by:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
- Searching for values I need using where() function.
通过以下方式枚举原始数据框中的所有对象:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
- 使用 where() 函数搜索我需要的值。
QUESTIONS:
问题:
- Why it doesn't work and how to make it working? How to add a row to a dataframe?
Would it work later to make something like:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
Any faster and simpler way to deal with it?
- 为什么它不起作用以及如何使它起作用?如何向数据框中添加一行?
以后是否可以制作类似的东西:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
有什么更快更简单的方法来处理它?
采纳答案by zero323
It doesn't work because:
它不起作用,因为:
- the second argument for
withColumn
should be aColumn
not a collection.np.array
won't work here - when you pass
"index in indexes"
as a SQL expression towhere
indexes
is out of scope and it is not resolved as a valid identifier
- 的第二个参数
withColumn
应该是一个Column
不是集合。np.array
不会在这里工作 - 当您
"index in indexes"
作为 SQL 表达式传递给where
indexes
超出范围且未解析为有效标识符时
PySpark >= 1.4.0
PySpark >= 1.4.0
You can add row numbers using respective window function and query using Column.isin
method or properly formated query string:
您可以使用相应的窗口函数添加行号,并使用Column.isin
方法或格式正确的查询字符串进行查询:
from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window
w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))
# Using DSL
indexed.where(col("index").isin(set(indexes)))
# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window
w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))
# Using DSL
indexed.where(col("index").isin(set(indexes)))
# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
It looks like window functions called without PARTITION BY
clause move all data to the single partition so above may be not the best solution after all.
看起来没有PARTITION BY
子句调用的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决方案。
Any faster and simpler way to deal with it?
有什么更快更简单的方法来处理它?
Not really. Spark DataFrames don't support random row access.
并不真地。Spark DataFrames 不支持随机行访问。
PairedRDD
can be accessed using lookup
method which is relatively fast if data is partitioned using HashPartitioner
. There is also indexed-rddproject which supports efficient lookups.
PairedRDD
可以使用访问lookup
方法,该方法是相对快速的,如果数据是使用分区HashPartitioner
。还有支持高效查找的indexed-rdd项目。
Edit:
编辑:
Independent of PySpark version you can try something like this:
独立于 PySpark 版本,您可以尝试以下操作:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
row = Row("char")
row_with_index = Row("char", "index")
df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)
## +----+
## |char|
## +----+
## | a|
## | b|
## | c|
## | d|
## | e|
## +----+
## only showing top 5 rows
# This part is not tested but should work and save some work later
schema = StructType(
df.schema.fields[:] + [StructField("index", LongType(), False)])
indexed = (df.rdd # Extract rdd
.zipWithIndex() # Add index
.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
.toDF(schema)) # It will work without schema but will be more expensive
# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))
回答by Joe Harris
If you want a number range that's guaranteed not to collide but does not require a .over(partitionBy())
then you can use monotonicallyIncreasingId()
.
如果您想要一个保证不会冲突但不需要的数字范围,.over(partitionBy())
那么您可以使用monotonicallyIncreasingId()
.
from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")
Note though that the values are not particularly "neat". Each partition is given a value range and the output will not be contiguous. E.g. 0, 1, 2, 8589934592, 8589934593, 8589934594
.
请注意,这些值并不是特别“整洁”。每个分区都有一个值范围,输出不会是连续的。例如0, 1, 2, 8589934592, 8589934593, 8589934594
。
This was added to Spark on Apr 28, 2015 here: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
这已于 2015 年 4 月 28 日添加到 Spark 中:https: //github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
回答by Mahdi Ghelichi
You certainly can add an array for indexing, an array of your choice indeed: In Scala, first we need to create an indexing Array:
您当然可以添加一个用于索引的数组,确实是您选择的数组:在 Scala 中,首先我们需要创建一个索引数组:
val index_array=(1 to df.count.toInt).toArray
index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
You can now append this column to your DF. First, For that, you need to open up our DF and get it as an array, then zip it with your index_array and then we convert the new array back into and RDD. The final step is to get it as a DF:
您现在可以将此列附加到您的 DF。首先,为此,您需要打开我们的 DF 并将其作为数组获取,然后使用您的 index_array 压缩它,然后我们将新数组转换回 RDD。最后一步是将其作为 DF:
final_df = sc.parallelize((df.collect.map(
x=>(x(0),x(1))) zip index_array).map(
x=>(x._1._1.toString,x._1._2.toString,x._2))).
toDF("column_name")
The indexing would be more clear after that.
之后索引会更加清晰。
回答by desaiankitb
monotonicallyIncreasingId()
- this will assign row numbers in incresing order but not in sequence.
monotonicallyIncreasingId()
- 这将按递增顺序而不是按顺序分配行号。
sample output with 2 columns:
具有 2 列的示例输出:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 12 | xz |
|---------------------|------------------|
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 12 | xz |
|---------------------|------------------|
If you want assign row numbers use following trick.
如果您想分配行号,请使用以下技巧。
Tested in spark-2.0.1 and greater versions.
在 spark-2.0.1 及更高版本中测试。
df.createOrReplaceTempView("df")
dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")
df.createOrReplaceTempView("df")
dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")
sample output with 2 columns:
具有 2 列的示例输出:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 2 | xz |
|---------------------|------------------|
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 2 | xz |
|---------------------|------------------|
Hope this helps.
希望这可以帮助。
回答by TheWinterSoldierVn
Selecting a single row nof a Pyspark DataFrame, try:
选择Pyspark DataFrame的单行n,尝试:
df.where(df.id == n).show()
Given a Pyspark DataFrame:
给定一个 Pyspark 数据帧:
df = spark.createDataFrame([(1, 143.5, 5.6, 28, 'M', 100000),\
(2, 167.2, 5.4, 45, 'M', None),\
(3, None , 5.2, None, None, None),\
], ['id', 'weight', 'height', 'age', 'gender', 'income'])
Selecting the 3rd row, try:
选择第三行,尝试:
df.where('id == 3').show()
Or:
或者:
df.where(df.id == 3).show()
Selecting multiple rows with rows' ids (the 2nd & the 3rd rows in this case), try:
选择具有行 ID 的多行(本例中为第 2 行和第 3 行),尝试:
id = {"2", "3"}
df.where(df.id.isin(id)).show()
回答by Devesh Sharma
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("Atr4", monotonically_increasing_id())
If you only need incremental values (like an ID) and if there is no constraint that the numbers need to be consecutive, you could use monotonically_increasing_id(). The only guarantee when using this function is that the values will be increasing for each row, however, the values themself can differ each execution.
如果您只需要增量值(如 ID)并且没有数字必须连续的限制,则可以使用 monotonically_increasing_id()。使用此函数时唯一的保证是每行的值都会增加,但是,每次执行时值本身可能会有所不同。