scala Spark - 将 CSV 文件加载为 DataFrame?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29704333/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:04:22  来源:igfitidea点击:

Spark - load CSV file as DataFrame?

scalaapache-sparkhadoopapache-spark-sqlhdfs

提问by Donbeo

I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")

我想在 spark 中读取 CSV 并将其转换为 DataFrame 并将其存储在 HDFS 中 df.registerTempTable("table_name")

I have tried:

我努力了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Error which I got:

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

What is the right command to load CSV file as DataFrame in Apache Spark?

在 Apache Spark 中将 CSV 文件加载为 DataFrame 的正确命令是什么?

回答by Shyamendra Solanki

spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example

spark-csv 是核心 Spark 功能的一部分,不需要单独的库。所以你可以做例如

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

In scala,(this works for any format-in delimiter mention "," for csv, "\t" for tsv etc)

在 Scala 中,(这适用于任何格式的分隔符提及“,”对于 csv,“\t”对于 tsv 等)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

回答by mrsrinivas

Parse CSV and load as DataFrame/DataSet with Spark 2.x

使用 Spark 2.x 解析 CSV 并加载为 DataFrame/DataSet

First, initialize SparkSessionobject by default it will available in shells as spark

首先,默认情况下初始化SparkSession对象,它将在外壳中可用spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") 
        .appName("Spark CSV Reader")
        .getOrCreate;

Use any one of the following ways to load CSV as DataFrame/DataSet

使用以下任何一种方式将 CSV 加载为 DataFrame/DataSet

1. Do it in a programmatic way

1. 以程序化的方式进行

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

more options to read are available at spark-csv(now park of Spark, from 2.x)

spark-csv提供了更多阅读选项(现在是 Spark 公园,从 2.x 开始)

2. You can do this SQL way as well

2.你也可以用这种SQL方式

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencies:

依赖项

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,




Spark version < 2.0

Spark 版本 < 2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependencies:

依赖项:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

回答by Eric Yiwei Liu

It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.

它的 Hadoop 是 2.6,Spark 是 1.6,并且没有“databricks”包。

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)

回答by penny chan

With Spark 2.0, following is how you can read CSV

使用 Spark 2.0,以下是读取 CSV 的方法

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)

回答by Rajeev Rathor

In Java 1.8 This code snippet perfectly working to read CSV files

在 Java 1.8 中,此代码片段非常适合读取 CSV 文件

POM.xml

POM文件

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Java

爪哇

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();

回答by Steve Loughran

Penny's Spark 2 example is the way to do it in spark2. There's one more trick: have that header generated for you by doing an initial scan of the data, by setting the option inferSchemato true

Penny 的 Spark 2 示例是在 spark2 中实现的方法。还有一个技巧:通过对数据进行初始扫描,通过将选项设置inferSchematrue

Here, then, assumming that sparkis a spark session you have set up, is the operation to load in the CSV index file of all the Landsat images which amazon host on S3.

在这里,假设spark您已经设置了一个 spark 会话,则是在 CSV 索引文件中加载亚马逊在 S3 上托管的所有 Landsat 图像的操作。

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

The bad news is: this triggers a scan through the file; for something large like this 20+MB zipped CSV file, that can take 30s over a long haul connection. Bear that in mind: you are better off manually coding up the schema once you've got it coming in.

坏消息是:这会触发对文件的扫描;对于像这个 20+MB 的压缩 CSV 文件这样的大文件,长途连接可能需要 30 秒。请记住:一旦输入架构,您最好手动对其进行编码。

(code snippet Apache Software License 2.0 licensed to avoid all ambiguity; something I've done as a demo/integration test of S3 integration)

(代码片段 Apache 软件许可证 2.0 已获得许可以避免所有歧义;我作为 S3 集成的演示/集成测试所做的一些事情)

回答by karthiks

There are a lot of challenges to parsing a CSV file, it keeps adding up if the file size is bigger, if there are non-english/escape/separator/other characters in the column values, that could cause parsing errors.

解析 CSV 文件有很多挑战,如果文件大小更大,它会不断增加,如果列值中有非英文/转义符/分隔符/其他字符,则可能导致解析错误。

The magic then is in the options that are used. The ones that worked for me and hope should cover most of the edge cases are in code below:

神奇之处在于所使用的选项。对我有用并希望涵盖大多数边缘情况的代码如下:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

Hope that helps. For more refer: Using PySpark 2 to read CSV having HTML source code

希望有帮助。有关更多信息,请参阅:使用 PySpark 2 读取具有 HTML 源代码的 CSV

Note: The code above is from Spark 2 API, where the CSV file reading API comes bundled with built-in packages of Spark installable.

注意:上面的代码来自 Spark 2 API,其中 CSV 文件读取 API 与 Spark 可安装的内置包捆绑在一起。

Note: PySpark is a Python wrapper for Spark and shares the same API as Scala/Java.

注意:PySpark 是 Spark 的 Python 包装器,与 Scala/Java 共享相同的 API。

回答by swapnil shashank

In case you are building a jar with scala 2.11 and Apache 2.0 or higher.

如果您正在使用 scala 2.11 和 Apache 2.0 或更高版本构建 jar。

There is no need to create a sqlContextor sparkContextobject. Just a SparkSessionobject suffices the requirement for all needs.

无需创建sqlContextsparkContext对象。只需一个SparkSession对象就足以满足所有需求。

Following is mycode which works fine:

以下是我的代码,它工作正常:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

In case you are running in cluster just change .master("local")to .master("yarn")while defining the sparkBuilderobject

如果您在集群中运行,只需在定义对象时更改.master("local").master("yarn")sparkBuilder

The Spark Doc covers this: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

Spark 文档涵盖了这一点:https: //spark.apache.org/docs/2.2.0/sql-programming-guide.html

回答by Venkat Kotra

To read from relative path on the system use System.getProperty method to get current directory and further uses to load the file using relative path.

要从系统上的相对路径读取,请使用 System.getProperty 方法获取当前目录,并进一步使用相对路径加载文件。

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

spark:2.4.4 scala:2.11.12

火花:2.4.4 斯卡拉:2.11.12

回答by S_K

Add following Spark dependencies to POM file :

将以下 Spark 依赖项添加到 POM 文件:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

//Spark configuration:

//火花配置:

val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()

val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()

//Read csv file:

//读取csv文件:

val df = spark.read.option("header", "true").csv("FILE_PATH")

val df = spark.read.option("header", "true").csv("FILE_PATH")

// Display output

// 显示输出

df.show()

df.show()