Python 使用 monotonically_increasing_id() 为 pyspark 数据帧分配行号
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/48209667/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Using monotonically_increasing_id() for assigning row number to pyspark dataframe
提问by muni
I am using monotonically_increasing_id() to assign row number to pyspark dataframe using syntax below:
我正在使用 monotonically_increasing_id() 使用以下语法将行号分配给 pyspark 数据帧:
df1 = df1.withColumn("idx", monotonically_increasing_id())
Now df1 has 26,572,528 records. So I was expecting idx value from 0-26,572,527.
现在 df1 有 26,572,528 条记录。所以我期待 idx 值在 0-26,572,527 之间。
But when I select max(idx), its value is strangely huge: 335,008,054,165.
但是当我选择 max(idx) 时,它的值出奇地大:335,008,054,165。
What's going on with this function? is it reliable to use this function for merging with another dataset having similar number of records?
这个功能是怎么回事?使用此函数与具有相似记录数的另一个数据集合并是否可靠?
I have some 300 dataframes which I want to combine into a single dataframe. So one dataframe contains IDs and others contain different records corresponding to them row-wise
我有大约 300 个数据帧,我想将它们组合成一个数据帧。因此,一个数据帧包含 ID,其他数据帧包含与它们逐行对应的不同记录
回答by mkaran
From the documentation
从文档
A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.
生成单调递增的 64 位整数的列。
生成的 ID 保证单调递增且唯一,但不连续。当前实现将分区 ID 放在高 31 位,将每个分区内的记录号放在低 33 位。假设数据帧的分区数少于 10 亿,每个分区的记录数少于 80 亿。
Thus, it is not like an auto-increment id in RDBs and it is notreliable for merging.
因此,它不像 RDB 中的自动增量 id 并且它对合并不可靠。
If you need an auto-increment behavior like in RDBs and your data is sortable, then you can use row_number
如果您需要像 RDB 一样的自动增量行为并且您的数据是可排序的,那么您可以使用 row_number
df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
| 1| ....... |
| 2| ....... |
| 3| ..........|
+---+-----------+
If your data is not sortable and you don't mind using rdds to create the indexes and then fall back to dataframes, you can use rdd.zipWithIndex()
如果您的数据不可排序并且您不介意使用 rdds 创建索引然后回退到数据帧,则可以使用 rdd.zipWithIndex()
An example can be found here
一个例子可以在这里找到
In short:
简而言之:
# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()
df.show()
# your data | indexes
+---------------------+---+
| _1 | _2|
+-----------=---------+---+
|[data col1,data col2]| 0|
|[data col1,data col2]| 1|
|[data col1,data col2]| 2|
+---------------------+---+
You will probably need some more transformations after that to get your dataframe to what you need it to be. Note: not a very performant solution.
之后您可能需要进行更多的转换才能使数据框达到您需要的状态。注意:不是一个非常高效的解决方案。
Hope this helps. Good luck!
希望这可以帮助。祝你好运!
Edit:Come to think about it, you can combine the monotonically_increasing_id
to use the row_number
:
编辑:想想看,你可以结合monotonically_increasing_id
使用row_number
:
# create a monotonically increasing id
df = df.withColumn("idx", monotonically_increasing_id())
# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')
Not sure about performance though.
虽然不确定性能。
回答by Ramesh Maharjan
using api functions you can do simply as the following
使用 api 函数,您可以简单地执行以下操作
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
df1 = df1.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
df1.withColumn("idx", F.row_number().over(windowSpec)).show()
I hope the answer is helpful
我希望答案有帮助
回答by Ankita Mehta
I found the solution by @mkaran useful, But for me there was no ordering column while using window function. I wanted to maintain the order of rows of dataframe as their indexes (what you would see in a pandas dataframe). Hence the solution in edit section came of use. Since it is a good solution (if performance is not a concern), I would like to share it as a separate answer.
我发现@mkaran 的解决方案很有用,但对我来说,使用窗口函数时没有排序列。我想保持数据帧行的顺序作为它们的索引(你会在熊猫数据帧中看到什么)。因此,编辑部分的解决方案就派上用场了。由于这是一个很好的解决方案(如果不关心性能),我想将其作为单独的答案分享。
# Add a increasing data column
df_index = df.withColumn("idx", monotonically_increasing_id())
# Create the window specification
w = Window.orderBy("idx")
# Use row number with the window specification
df_index = df_index.withColumn("index", F.row_number().over(w))
# Drop the created increasing data column
df2_index = df2_index.drop("idx")
df
is your original dataframe and df_index
is new dataframe.
df
是您的原始数据框,df_index
是新的数据框。
回答by Devi
To merge dataframes of same size, use zip
on rdds
要合并相同大小的数据帧,请zip
在 rdds 上使用
from pyspark.sql.types import StructType
spark = SparkSession.builder().master("local").getOrCreate()
df1 = spark.sparkContext.parallelize([(1, "a"),(2, "b"),(3, "c")]).toDF(["id", "name"])
df2 = spark.sparkContext.parallelize([(7, "x"),(8, "y"),(9, "z")]).toDF(["age", "address"])
schema = StructType(df1.schema.fields + df2.schema.fields)
df1df2 = df1.rdd.zip(df2.rdd).map(lambda x: x[0]+x[1])
spark.createDataFrame(df1df2, schema).show()
But note the following from help of the method,
但是请注意该方法的帮助中的以下内容,
Assumes that the two RDDs have the same number of partitions and the same
number of elements in each partition (e.g. one was made through
a map on the other).