Python PySpark 删除行
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24718697/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
PySpark Drop Rows
提问by Hyman
how do you drop rows from an RDD in PySpark? Particularly the first row, since that tends to contain column names in my datasets. From perusing the API, I can't seem to find an easy way to do this. Of course I could do this via Bash / HDFS, but I just want to know if this can be done from within PySpark.
你如何从 PySpark 的 RDD 中删除行?特别是第一行,因为它往往包含我的数据集中的列名。通过仔细阅读 API,我似乎找不到一种简单的方法来做到这一点。当然,我可以通过 Bash/HDFS 做到这一点,但我只想知道这是否可以在 PySpark 中完成。
采纳答案by maasg
AFAIK there's no 'easy' way to do this.
AFAIK 没有“简单”的方法可以做到这一点。
This should do the trick, though:
不过,这应该可以解决问题:
val header = data.first
val rows = data.filter(line => line != header)
回答by aaronman
Personally I think just using a filter to get rid of this stuff is the easiest way. But per your comment I have another approach. Glom the RDD so each partition is an array (I'm assuming you have 1 file per partition, and each file has the offending row on top) and then just skip the first element (this is with the scala api).
我个人认为仅使用过滤器来摆脱这些东西是最简单的方法。但根据您的评论,我有另一种方法。Glom RDD 所以每个分区都是一个数组(我假设你每个分区有 1 个文件,每个文件都有违规行),然后跳过第一个元素(这是使用 scala api)。
data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index
data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index
Keep in mind one of the big features of RDD's is that they are immutable, so naturally removing a row is a tricky thing to do
请记住,RDD 的一大特点是它们是不可变的,所以自然地删除一行是一件棘手的事情
UPDATE:Better solution.rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
Same as the glom but doesn't have the overhead of putting everything into an array, since x is an iterator in this case
更新:更好的解决方案。rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
与 glom 相同,但没有将所有内容放入数组的开销,因为在这种情况下 x 是迭代器
回答by aaronman
Specific to PySpark:
特定于 PySpark:
As per @maasg, you could do this:
根据@maasg,您可以这样做:
header = rdd.first()
rdd.filter(lambda line: line != header)
but it's not technically correct, as it's possible you exclude lines containing data as well as the header. However, this seems to work for me:
但这在技术上并不正确,因为您可能会排除包含数据和标题的行。但是,这似乎对我有用:
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)
Similarly:
相似地:
rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
I'm new to Spark, so can't intelligently comment about which will be fastest.
我是 Spark 的新手,所以无法明智地评论哪个最快。
回答by noleto
A straightforward way to achieve this in PySpark (Python API), assuming you are using Python 3:
在 PySpark(Python API)中实现此目的的一种直接方法,假设您使用的是 Python 3:
noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
回答by kartik
I have tested with spark2.1. Let's say you want to remove first 14 rows without knowing about number of columns file has.
我已经用 spark2.1 测试过了。假设您想在不知道文件列数的情况下删除前 14 行。
sc = spark.sparkContext
lines = sc.textFile("s3://location_of_csv")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn is a df function. So below will not work in RDD style as used in above case.
withColumn 是一个 df 函数。因此,下面将无法在上述案例中使用的 RDD 样式中工作。
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
回答by Anant Gupta
I did some profiling with various solutions and have the following
我对各种解决方案进行了一些分析,并具有以下内容
Cluster Configuration
集群配置
Clusters
集群
- Cluster 1 : 4 Cores 16 GB
- Cluster 2 : 4 Cores 16 GB
- Cluster 3 : 4 Cores 16 GB
- Cluster 4 : 2 Cores 8 GB
- 集群 1:4 核 16 GB
- 集群 2:4 核 16 GB
- 集群 3:4 核 16 GB
- 集群 4:2 核 8 GB
Data
数据
7 million rows, 4 columns
700 万行,4 列
#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)
#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
if(index==0):
for subIndex,item in enumerate(iterator):
if subIndex > 0:
yield item
else:
yield iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
I think that Solution 3 is the most scalable
我认为解决方案 3 是最具扩展性的