Python 在 PySpark 中读取文件并将其转换为 Pandas Dataframe 时如何将第一行作为标题
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34832312/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to make the first first row as header when reading a file in PySpark and converting it to Pandas Dataframe
提问by user2966197
I am reading a file in PySpark
and forming the rdd
of it. I then convert it to a normal dataframe
and then to pandas dataframe
. The issue that I am having is that there is header row in my input file and I want to make this as the header of dataframe columns as well but they are read in as an additional row and not as header. This is my current code:
我正在读取一个文件PySpark
并形成rdd
它。然后我将其转换为正常dataframe
,然后转换为pandas dataframe
. 我遇到的问题是我的输入文件中有标题行,我也想将其作为数据帧列的标题,但它们作为附加行而不是标题读入。这是我当前的代码:
def extract(line):
return line
input_file = sc.textFile('file1.txt').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)
input_data = (input_file
.map(lambda line: line.split(";"))
.filter(lambda line: len(line) >=0 )
.map(extract)) # Map to tuples
df_normal = input_data.toDF()
df= df_normal.toPandas()
Now when I look at the df
then the header row of text file becomes the first row of dataframe
and there is additional header in df
with 0,1,2...
as header. How can I make the first row as header?
现在,当我看df
,然后文本文件的标题行成为第一行dataframe
并没有额外的报头df
与0,1,2...
报头。如何将第一行作为标题?
采纳答案by desertnaut
There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I'll try to show it using a datafile nyctaxicab.csv
that you can download.
有几种方法可以做到这一点,具体取决于数据的确切结构。由于您没有提供任何详细信息,我将尝试使用nyctaxicab.csv
您可以下载的数据文件来展示它。
If your file is in csv
format, you should use the relevant spark-csv
package, provided by Databricks. No need to download it explicitly, just run pyspark
as follows:
如果您的文件采用csv
格式,则应使用spark-csv
Databricks 提供的相关包。无需显式下载,运行pyspark
如下:
$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0
and then
进而
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv',
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
>>> df.count()
249999
The file has 250,000 rows including the header, so 249,999 is the correct number of actual records. Here is the schema, as inferred automatically by the package:
该文件有 250,000 行(包括标题),因此 249,999 是正确的实际记录数。这是包自动推断的架构:
>>> df.dtypes
[('_id', 'string'),
('_rev', 'string'),
('dropoff_datetime', 'string'),
('dropoff_latitude', 'double'),
('dropoff_longitude', 'double'),
('hack_license', 'string'),
('medallion', 'string'),
('passenger_count', 'int'),
('pickup_datetime', 'string'),
('pickup_latitude', 'double'),
('pickup_longitude', 'double'),
('rate_code', 'int'),
('store_and_fwd_flag', 'string'),
('trip_distance', 'double'),
('trip_time_in_secs', 'int'),
('vendor_id', 'string')]
You can see more details in my relevant blog post.
您可以在我的相关博客文章中查看更多详细信息。
If, for whatever reason, you cannot use the spark-csv
package, you'll have to subtract the first row from the data and then use it to construct your schema. Here is the general idea, and you can again find a full example with code details in another blog postof mine:
如果由于某种原因无法使用该spark-csv
包,则必须从数据中减去第一行,然后使用它来构建架构。这是总体思路,您可以在我的另一篇博文中再次找到包含代码详细信息的完整示例:
>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']
# Construct the schema from the header
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','') # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)
# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema) # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas() # pandas dataframe
For brevity, here all columns end up being of type string
, but in the blog postI show in detail and explain how you can further refine the desired data types (and names) for specific fields.
为简洁起见,这里的所有列最终都是 type string
,但在博客文章中,我详细展示并解释了如何进一步优化特定字段的所需数据类型(和名称)。
回答by naren
The simple answer would be set header='true'
简单的答案将被设置 header='true'
Eg:
例如:
df = spark.read.csv('housing.csv', header='true')
or
或者
df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")
回答by srikanth
One more way to do is below,
另一种方法是在下面,
log_txt = sc.textFile(file_path)
header = log_txt.first() #get the first row to a variable
fields = [StructField(field_name, StringType(), True) for field_name in header] #get the types of header variable fields
schema = StructType(fields)
filter_data = log_txt.filter(lambda row:row != header) #remove the first row from or else there will be duplicate rows
df = spark.createDataFrame(filter_data, schema=schema) #convert to pyspark DF
df.show()