使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25362942/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 00:13:06  来源:igfitidea点击:

Parse CSV as DataFrame/DataSet with Apache Spark and Java

javaapache-sparkhadoopapache-spark-sqlhdfs

提问by mithra

I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):

我是 spark 新手,我想使用 group-by & reduce 从 CSV 中找到以下内容(受雇的一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

I would like to simplify the about CSV with group by Department, Designation, Statewith additional columns with sum(costToCompany)and TotalEmployeeCount

我想通过按部门、名称、状态sum(costToCompany)TotalEmployeeCount 的附加列来简化关于 CSV 的内容

Should get a result like:

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?

有什么方法可以使用转换和动作来实现这一点。或者我们应该去 RDD 操作?

采纳答案by emecas

Procedure

程序

  • Create a Class (Schema) to encapsulate your structure (it's not required for the approach B, but it would make your code easier to read if you are using Java)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • Loading CVS (JSON) file

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    
  • 创建一个类(架构)来封装您的结构(方法 B 不需要它,但如果您使用 Java,它会使您的代码更易于阅读)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • 加载 CVS (JSON) 文件

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

At this point you have 2 approaches:

此时你有两种方法:

A. SparkSQL

A. SparkSQL

  • Register a table (using the your defined Schema Class)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • Query the table with your desired Query-group-by

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • Here you would also be able to do any other query you desire, using a SQL approach

  • 注册一个表(使用您定义的架构类)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • 使用所需的 Query-group-by 查询表

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • 在这里,您还可以使用 SQL 方法执行您想要的任何其他查询

B. Spark

B. 火花

  • Mapping using a composite key: Department,Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey using the composite key, summing costToCompanycolumn, and accumulating the number of records by key

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
  • 使用复合键映射:Department, Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey 使用组合键,求和costToCompany列,按键累加记录数

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    

回答by jkgeyti

The following might not be entirely correct, but it should give you some idea of how to juggle data. It's not pretty, should be replaced with case classes etc, but as a quick example of how to use the spark api, I hope it's enough :)

以下内容可能不完全正确,但它应该能让您了解如何处理数据。它不漂亮,应该用 case 类等替换,但作为如何使用 spark api 的快速示例,我希望它足够了 :)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

Or you can use SparkSQL:

或者你可以使用 SparkSQL:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""

回答by yhuai

For JSON, if your text file contains one JSON object per line, you can use sqlContext.jsonFile(path)to let Spark SQL load it as a SchemaRDD(the schema will be automatically inferred). Then, you can register it as a table and query it with SQL. You can also manually load the text file as an RDD[String]containing one JSON object per record and use sqlContext.jsonRDD(rdd)to turn it as a SchemaRDD. jsonRDDis useful when you need to pre-process your data.

对于 JSON,如果您的文本文件每行包含一个 JSON 对象,您可以使用sqlContext.jsonFile(path)让 Spark SQL 将其加载为SchemaRDD(将自动推断架构)。然后,您可以将其注册为表并使用 SQL 进行查询。您还可以手动将文本文件加载为RDD[String]每条记录包含一个 JSON 对象的文件,并将sqlContext.jsonRDD(rdd)其转换为SchemaRDD. jsonRDD当您需要预处理数据时非常有用。

回答by mrsrinivas

CSV file can be parsed with Spark built-in CSV reader. It will return DataFrame/DataSet on the successful read of the file. On top of DataFrame/DataSet, you apply SQL-like operations easily.

CSV 文件可以使用 Spark 内置的 CSV 阅读器进行解析。成功读取文件后,它将返回 DataFrame/DataSet。在 DataFrame/DataSet 之上,您可以轻松应用类似 SQL 的操作。

Using Spark 2.x(and above) with Java

在 Java 中使用 Spark 2.x(及更高版本)

Create SparkSession object aka spark

创建 SparkSession 对象又名 spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

Create Schema for Row with StructType

为行创建架构 StructType

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

Create dataframe from CSV file and apply schema to it

从 CSV 文件创建数据框并将架构应用于它

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

more option on reading data from CSV file

从 CSV 文件读取数据的更多选项

Now we can aggregation on data in 2 ways

现在我们可以通过两种方式聚合数据

1. SQL way

Register a table in spark sql metastore to perform SQL operation

df.createOrReplaceTempView("employee");

Run SQL query on registered dataframe

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

We can even execute SQL directly on CSV file with out creating table with Spark SQL

1.SQL方式

在spark sql metastore中注册一张表进行SQL操作

df.createOrReplaceTempView("employee");

在注册的数据帧上运行 SQL 查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

我们甚至可以直接在 CSV 文件上执行 SQL,而无需使用 Spark SQL 创建表



2. Object chaining or Programming or Java-like way

Do the necessary import for sql functions

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

Use groupByand aggon dataframe/dataset to perform countand sumon data

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

2.对象链或编程或类Java的方式

为 sql 函数做必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

在数据帧/数据集上使用groupByandagg来执行countsum处理数据

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

dependent libraries

依赖库

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"