Python 在 PySpark 中读取文件并将其转换为 Pandas Dataframe 时如何将第一行作为标题

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34832312/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 15:34:46  来源:igfitidea点击:

How to make the first first row as header when reading a file in PySpark and converting it to Pandas Dataframe

pythonpandasapache-sparkpysparkapache-spark-sql

提问by user2966197

I am reading a file in PySparkand forming the rddof it. I then convert it to a normal dataframeand then to pandas dataframe. The issue that I am having is that there is header row in my input file and I want to make this as the header of dataframe columns as well but they are read in as an additional row and not as header. This is my current code:

我正在读取一个文件PySpark并形成rdd它。然后我将其转换为正常dataframe,然后转换为pandas dataframe. 我遇到的问题是我的输入文件中有标题行,我也想将其作为数据帧列的标题,但它们作为附加行而不是标题读入。这是我当前的代码:

def extract(line):
    return line


input_file = sc.textFile('file1.txt').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)

input_data = (input_file
    .map(lambda line: line.split(";"))
    .filter(lambda line: len(line) >=0 )
    .map(extract)) # Map to tuples

df_normal = input_data.toDF()
df= df_normal.toPandas()

Now when I look at the dfthen the header row of text file becomes the first row of dataframeand there is additional header in dfwith 0,1,2...as header. How can I make the first row as header?

现在,当我看df,然后文本文件的标题行成为第一行dataframe并没有额外的报头df0,1,2...报头。如何将第一行作为标题?

采纳答案by desertnaut

There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I'll try to show it using a datafile nyctaxicab.csvthat you can download.

有几种方法可以做到这一点,具体取决于数据的确切结构。由于您没有提供任何详细信息,我将尝试使用nyctaxicab.csv您可以下载的数据文件来展示它。

If your file is in csvformat, you should use the relevant spark-csvpackage, provided by Databricks. No need to download it explicitly, just run pysparkas follows:

如果您的文件采用csv格式,则应使用spark-csvDatabricks 提供的相关包。无需显式下载,运行pyspark如下:

$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0

and then

进而

>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

>>> df.count()
249999

The file has 250,000 rows including the header, so 249,999 is the correct number of actual records. Here is the schema, as inferred automatically by the package:

该文件有 250,000 行(包括标题),因此 249,999 是正确的实际记录数。这是包自动推断的架构:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

You can see more details in my relevant blog post.

您可以在我的相关博客文章中查看更多详细信息。

If, for whatever reason, you cannot use the spark-csvpackage, you'll have to subtract the first row from the data and then use it to construct your schema. Here is the general idea, and you can again find a full example with code details in another blog postof mine:

如果由于某种原因无法使用该spark-csv包,则必须从数据中减去第一行,然后使用它来构建架构。这是总体思路,您可以在我的另一篇博文中再次找到包含代码详细信息的完整示例:

>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
 u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']

# Construct the schema from the header 
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','')  # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)

# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema)  # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas()  # pandas dataframe 

For brevity, here all columns end up being of type string, but in the blog postI show in detail and explain how you can further refine the desired data types (and names) for specific fields.

为简洁起见,这里的所有列最终都是 type string,但在博客文章中,我详细展示并解释了如何进一步优化特定字段的所需数据类型(和名称)。

回答by naren

The simple answer would be set header='true'

简单的答案将被设置 header='true'

Eg:

例如:

df = spark.read.csv('housing.csv', header='true')

or

或者

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

回答by srikanth

One more way to do is below,

另一种方法是在下面,

log_txt = sc.textFile(file_path)
header = log_txt.first() #get the first row to a variable
fields = [StructField(field_name, StringType(), True) for field_name in header] #get the types of header variable fields
schema = StructType(fields) 
filter_data = log_txt.filter(lambda row:row != header) #remove the first row from or else there will be duplicate rows 
df = spark.createDataFrame(filter_data, schema=schema) #convert to pyspark DF
df.show()