scala 在将 csv 文件作为数据帧读取时提供架构

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39926411/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:42:30  来源:igfitidea点击:

Provide schema while reading csv file as a dataframe

scalaapache-sparkdataframeapache-spark-sqlspark-csv

提问by Pa1

I am trying to read a csv file into a dataframe. I know what the schema of my dataframe should be since I know my csv file. Also I am using spark csv package to read the file. I trying to specify the schema like below.

我正在尝试将 csv 文件读入数据帧。我知道我的数据框的架构应该是什么,因为我知道我的 csv 文件。我也使用 spark csv 包来读取文件。我试图指定如下所示的架构。

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

But when I check the schema of the data frame I created, it seems to have taken its own schema. Am I doing anything wrong ? how to make spark to pick up the schema I mentioned ?

但是当我检查我创建的数据框的架构时,它似乎采用了自己的架构。我做错了什么吗?如何让火花拿起我提到的模式?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

回答by Arunakiran Nulu

Try the below code, you need not specify the schema. When you give inferSchema as true it should take it from your csv file.

试试下面的代码,你不需要指定架构。当您将 inferSchema 设为 true 时,它​​应该从您的 csv 文件中获取。

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

If you want to manually specify the schema, you can do it as below:

如果要手动指定架构,可以按如下方式进行:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

回答by Alberto Castelo Becerra

I'm using the solution provided by Arunakiran Nulu in my analysis (see the code). Despite it is able to assign the correct types to the columns, all the values returned are null. Previously, I've tried to the option .option("inferSchema", "true")and it returns the correct values in the dataframe (although different type).

我在分析中使用了 Arunakiran Nulu 提供的解决方案(见代码)。尽管它能够为列分配正确的类型,但返回的所有值都是null. 以前,我尝试过该选项.option("inferSchema", "true"),它在数据框中返回正确的值(尽管类型不同)。

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))

val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "\t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)

Result

结果

root


|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)

and the table is:

表是:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|

回答by user3008410

For those interested in doing this in Python here is a working version.

对于那些有兴趣在 Python 中执行此操作的人,这里有一个工作版本。

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)

testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66

Hope this helps.

希望这可以帮助。

回答by X.X

Thanks to the answer by @Nulu, it works for pyspark with minimal tweaking

感谢@Nulu 的回答,它适用于 pyspark,只需最少的调整

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))

pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

回答by u5827450

Here's how you can work with a custom schema, a complete demo:

以下是如何使用自定义架构,一个完整的演示:

$> shell code,

$> 外壳代码,

echo "
Slingo, iOS 
Slingo, Android
" > game.csv

Scala code:

斯卡拉代码:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("game_id", StringType, true),
  StructField("os_id", StringType, true)
))

val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show 

csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show 

回答by Nilesh Shinde

This is one of option where we can pass the column names to the dataframe while loading CSV.

这是我们可以在加载 CSV 时将列名传递给数据框的选项之一。

import pandas
    names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
    dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
print(dataset.head(10))

Output

输出

    sepal-length  sepal-width  petal-length  petal-width        class
1            5.1          3.5           1.4          0.2  Iris-setosa
2            4.9          3.0           1.4          0.2  Iris-setosa
3            4.7          3.2           1.3          0.2  Iris-setosa
4            4.6          3.1           1.5          0.2  Iris-setosa
5            5.0          3.6           1.4          0.2  Iris-setosa
6            5.4          3.9           1.7          0.4  Iris-setosa
7            4.6          3.4           1.4          0.3  Iris-setosa
8            5.0          3.4           1.5          0.2  Iris-setosa
9            4.4          2.9           1.4          0.2  Iris-setosa
10           4.9          3.1           1.5          0.1  Iris-setosa

回答by dalwinder singh

// import Library
import java.io.StringReader ;

import au.com.bytecode.opencsv.CSVReader

//filename

var train_csv = "/Path/train.csv";

//read as text file

val train_rdd = sc.textFile(train_csv)   

//use string reader to convert in proper format

var full_train_data  = train_rdd.map{line =>  var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext();  }   

//declares  types

type s = String

// declare case class for schema

case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
    LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)

//create DF RDD with custom schema 

var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); 
                     itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF

回答by Suresh Chaganti

schema definition as simple string

模式定义为简单字符串

Just in case if some one is interested in schema definition as simple string with dateand time stamp

以防万一有人对作为带有日期时间戳的简单字符串的模式定义感兴趣

data file creation from Terminal or shell

从终端或外壳创建数据文件

echo " 
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
" > data.csv

Defining the schema as String

将模式定义为字符串

    user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'

reading the data

读取数据

    df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')

    df.show(10, False)

    +-----------------------+----------+----------+---------+
    |timesta                |date      |first_name|last_name|
    +-----------------------+----------+----------+---------+
    |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
    |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
    +-----------------------+----------+----------+---------+

Please note defining the schema explicitly instead of letting spark infer the schema also improves the spark read performance.

请注意明确定义模式而不是让 spark 推断模式也提高了 spark 读取性能。

回答by YOLO

In pyspark 2.4 onwards, you can simply use headerparameter to set the correct header:

在 pyspark 2.4 之后,您可以简单地使用header参数来设置正确的标头:

data = spark.read.csv('data.csv', header=True)

Similarly, if using scala you can use headerparameter as well.

同样,如果使用 scala,您也可以使用header参数。

回答by whoisthis

You can also do like this by using sparkSession and implicit

您也可以通过使用 sparkSession 和隐式

import sparkSession.implicits._
val pagecount:DataFrame = sparkSession.read
.option("delimiter"," ")
.option("quote","")
.option("inferSchema","true")
.csv("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
.toDF("project","article","requests","bytes_served")