Python 使用 Spark 加载 CSV 文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28782940/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Load CSV file with Spark
提问by Kernael
I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :
我是 Spark 的新手,我正在尝试使用 Spark 从文件中读取 CSV 数据。这是我在做什么:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
I would expect this call to give me a list of the two first columns of my file but I'm getting this error :
我希望这个调用能给我一个文件的前两列的列表,但我收到了这个错误:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
although my CSV file as more than one column.
虽然我的 CSV 文件不止一列。
采纳答案by G Quintana
Are you sure that allthe lines have at least 2 columns? Can you try something like, just to check?:
您确定所有行都至少有 2 列吗?你可以尝试类似的东西,只是为了检查?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
Alternatively, you could print the culprit (if any):
或者,您可以打印罪魁祸首(如果有):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
回答by optimist
Now, there's also another option for any general csv file: https://github.com/seahboonsiew/pyspark-csvas follows:
现在,任何通用 csv 文件还有另一个选项:https: //github.com/seahboonsiew/pyspark-csv,如下所示:
Assume we have the following context
假设我们有以下上下文
sc = SparkContext
sqlCtx = SQLContext or HiveContext
First, distribute pyspark-csv.py to executors using SparkContext
首先,使用 SparkContext 将 pyspark-csv.py 分发给执行程序
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Read csv data via SparkContext and convert it to DataFrame
通过 SparkContext 读取 csv 数据并将其转换为 DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
回答by JP Mercier
And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.
还有一个选项是使用 Pandas 读取 CSV 文件,然后将 Pandas DataFrame 导入 Spark。
For example:
例如:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
回答by iec2011007
If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile()
and parse it
如果您的 csv 数据碰巧不包含任何字段中的换行符,您可以加载您的数据textFile()
并解析它
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
回答by zero323
Spark 2.0.0+
火花 2.0.0+
You can use built-in csv data source directly:
您可以直接使用内置的 csv 数据源:
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
or
或者
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
without including any external dependencies.
不包括任何外部依赖项。
Spark < 2.0.0:
火花 < 2.0.0:
Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv
:
而不是手动解析,这在一般情况下远非微不足道,我建议spark-csv
:
Make sure that Spark CSV is included in the path (--packages
, --jars
, --driver-class-path
)
确保路径 ( --packages
, --jars
, --driver-class-path
) 中包含 Spark CSV
And load your data as follows:
并按如下方式加载您的数据:
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.
它可以处理加载、模式推断、删除格式错误的行,并且不需要将数据从 Python 传递到 JVM。
Note:
注意:
If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader
. Assuming you have three columns - integer, double and string:
如果您知道架构,最好避免架构推断并将其传递给DataFrameReader
. 假设您有三列 - 整数、双精度和字符串:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
回答by Galen Long
Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c
), so it's not recommended. zero323's answeris good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csvmodule:
简单地用逗号分割也会分割字段内的逗号(例如a,b,"1,2,3",c
),因此不推荐这样做。如果您想使用 DataFrames API,zero323 的答案很好,但是如果您想坚持使用基础 Spark,您可以使用csv模块在基础 Python 中解析 csvs :
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(make sure not to modify header
before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.
编辑:正如评论中提到的@muon,这会将标题视为任何其他行,因此您需要手动提取它。例如,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(确保header
在过滤器评估之前不要修改)。但此时,您最好使用内置的 csv 解析器。
回答by abby sobh
This is in-line with what JP Mercier initially suggestedabout using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)
这与JP Mercier 最初关于使用 Pandas 的建议一致,但有一个重大修改:如果将数据分块读入 Pandas,它应该更具延展性。这意味着,您可以解析一个比 Pandas 实际可以处理的大得多的文件,并将其以更小的尺寸传递给 Spark。(这也回答了关于如果他们无论如何都可以将所有内容加载到 Pandas 中为什么要使用 Spark 的评论。)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
回答by y durga prasad
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");
print(df.collect())
回答by Jeril
If you want to load csv as a dataframe then you can do the following:
如果要将 csv 作为数据帧加载,则可以执行以下操作:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
It worked fine for me.
它对我来说很好。
回答by Yogesh
If you are having any one or more row(s) with less or more number of columns than 2 in the dataset then this error may arise.
如果数据集中的任何一行或多行列数少于或多于 2,则可能会出现此错误。
I am also new to Pyspark and trying to read CSV file. Following code worked for me:
我也是 Pyspark 的新手并试图读取 CSV 文件。以下代码对我有用:
In this code I am using dataset from kaggle the link is: https://www.kaggle.com/carrie1/ecommerce-data
在这段代码中,我使用了来自 kaggle 的数据集,链接是:https://www.kaggle.com/carrie1/ecommerce-data
1. Without mentioning the schema:
1. 不提架构:
from pyspark.sql import SparkSession
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()
Now check the columns: sdfData.columns
现在检查列:sdfData.columns
Output will be:
输出将是:
['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']
Check the datatype for each column:
检查每列的数据类型:
sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))
This will give the data frame with all the columns with datatype as StringType
这将为数据框提供所有数据类型为 StringType 的列
2. With schema:If you know the schema or want to change the datatype of any column in the above table then use this (let's say I am having following columns and want them in a particular data type for each of them)
2. 使用架构:如果您知道架构或想要更改上表中任何列的数据类型,请使用它(假设我有以下列,并希望它们具有特定的数据类型)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([\
StructField("InvoiceNo", IntegerType()),\
StructField("StockCode", StringType()), \
StructField("Description", StringType()),\
StructField("Quantity", IntegerType()),\
StructField("InvoiceDate", StringType()),\
StructField("CustomerID", DoubleType()),\
StructField("Country", StringType())\
])
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL example: Reading CSV file with schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)
Now check the schema for datatype of each column:
现在检查每列数据类型的模式:
sdfData.schema
StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
Edited: We can use the following line of code as well without mentioning schema explicitly:
编辑:我们也可以使用以下代码行而无需明确提及架构:
sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema
The output is:
输出是:
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))
The output will look like this:
输出将如下所示:
sdfData.show()
+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows