如何将生成的 RDD 写入 Spark python 中的 csv 文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31898964/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:42:49  来源:igfitidea点击:

How to write the resulting RDD to a csv file in Spark python

pythoncsvapache-sparkpysparkfile-writing

提问by Jason Donnald

I have a resulting RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions). This has output in this format:

我有一个结果 RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)。这具有以下格式的输出:

[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]

What I want is to create a CSV file with one column for labels(the first part of the tuple in above output) and one for predictions(second part of tuple output). But I don't know how to write to a CSV file in Spark using Python.

我想要的是创建一个 CSV 文件,其中一列用于labels(上述输出中元组的第一部分)和一列用于predictions(元组输出的第二部分)。但我不知道如何使用 Python 在 Spark 中写入 CSV 文件。

How can I create a CSV file with the above output?

如何使用上述输出创建 CSV 文件?

采纳答案by Daniel Darabos

Just mapthe lines of the RDD (labelsAndPredictions) into strings (the lines of the CSV) then use rdd.saveAsTextFile().

只是map将 RDD ( labelsAndPredictions) 的行转换为字符串(CSV 的行)然后使用rdd.saveAsTextFile().

def toCSVLine(data):
  return ','.join(str(d) for d in data)

lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')

回答by Galen Long

It's not good to just join by commas because if fields contain commas, they won't be properly quoted, e.g. ','.join(['a', 'b', '1,2,3', 'c'])gives you a,b,1,2,3,cwhen you'd want a,b,"1,2,3",c. Instead, you should use Python's csv module to convert each list in the RDD to a properly-formatted csv string:

只用逗号连接是不好的,因为如果字段包含逗号,它们将不会被正确引用,例如在你想要的时候','.join(['a', 'b', '1,2,3', 'c'])给你。相反,您应该使用 Python 的 csv 模块将 RDD 中的每个列表转换为格式正确的 csv 字符串:a,b,1,2,3,ca,b,"1,2,3",c

# python 3
import csv, io

def list_to_csv_str(x):
    """Given a list of strings, returns a properly-csv-formatted string."""
    output = io.StringIO("")
    csv.writer(output).writerow(x)
    return output.getvalue().strip() # remove extra newline

# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")

Since the csv module only writes to file objects, we have to create an empty "file" with io.StringIO("")and tell the csv.writer to write the csv-formatted string into it. Then, we use output.getvalue()to get the string we just wrote to the "file". To make this code work with Python 2, just replace io with the StringIO module.

由于 csv 模块只写入文件对象,我们必须创建一个空的“文件”io.StringIO("")并告诉 csv.writer 将 csv 格式的字符串写入其中。然后,我们使用output.getvalue()来获取我们刚刚写入“文件”的字符串。要使此代码适用于 Python 2,只需将 io 替换为 StringIO 模块。

If you're using the Spark DataFrames API, you can also look into the DataBricks save function, which has a csv format.

如果您使用的是 Spark DataFrames API,您还可以查看DataBricks 保存函数,它具有 csv 格式。

回答by Insilico

I know this is an old post. But to help someone searching for the same, here's how I write a two column RDD to a single CSV file in PySpark 1.6.2

我知道这是一个旧帖子。但为了帮助搜索相同的人,这里是我如何在 PySpark 1.6.2 中将两列 RDD 写入单个 CSV 文件

The RDD:

RDD:

>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]

Now the code:

现在代码:

# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])

The DF:

DF:

>>> df.show()
+-----+-----------+
|count|       word|
+-----+-----------+
|73342|      cells|
|62861|       cell|
|61714|    studies|
|61377|        aim|
|60168|   clinical|
|59275|          2|
|59221|          1|
|58274|       data|
|58087|development|
|56579|     cancer|
|50243|    disease|
|49817|   provided|
|49216|   specific|
|48857|     health|
|48536|      study|
|47827|    project|
|45573|description|
|45455|  applicant|
|44739|    program|
|44522|   patients|
+-----+-----------+
only showing top 20 rows

Now write to CSV

现在写入 CSV

# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')

P.S: I am just a beginner learning from posts here in Stackoverflow. So I don't know whether this is the best way. But it worked for me and I hope it will help someone!

PS:我只是一个初学者,从 Stackoverflow 的帖子中学习。所以我不知道这是否是最好的方法。但它对我有用,我希望它能帮助别人!

回答by Vardhaman Jain

    def toCSV(RDD):

        for element in RDD:
        return ','.join(str(element))

    rows_of_csv=RDD.map(toCSV)
    rows_of_csv.saveAsTextFile('/FileStore/tables/name_of_csv_file.csv')

# choose your path based on your distributed file system