使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25362942/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Parse CSV as DataFrame/DataSet with Apache Spark and Java
提问by mithra
I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):
我是 spark 新手,我想使用 group-by & reduce 从 CSV 中找到以下内容(受雇的一行):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
I would like to simplify the about CSV with group by Department, Designation, Statewith additional columns with sum(costToCompany)and TotalEmployeeCount
我想通过按部门、名称、状态和sum(costToCompany)和TotalEmployeeCount 的附加列来简化关于 CSV 的内容
Should get a result like:
应该得到如下结果:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?
有什么方法可以使用转换和动作来实现这一点。或者我们应该去 RDD 操作?
采纳答案by emecas
Procedure
程序
Create a Class (Schema) to encapsulate your structure (it's not required for the approach B, but it would make your code easier to read if you are using Java)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
Loading CVS (JSON) file
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
创建一个类(架构)来封装您的结构(方法 B 不需要它,但如果您使用 Java,它会使您的代码更易于阅读)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
加载 CVS (JSON) 文件
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
At this point you have 2 approaches:
此时你有两种方法:
A. SparkSQL
A. SparkSQL
Register a table (using the your defined Schema Class)
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();
Query the table with your desired Query-group-by
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");
Here you would also be able to do any other query you desire, using a SQL approach
注册一个表(使用您定义的架构类)
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();
使用所需的 Query-group-by 查询表
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");
在这里,您还可以使用 SQL 方法执行您想要的任何其他查询
B. Spark
B. 火花
Mapping using a composite key:
Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }
});
reduceByKey using the composite key, summing
costToCompany
column, and accumulating the number of records by keyJavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });
使用复合键映射:
Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }
});
reduceByKey 使用组合键,求和
costToCompany
列,按键累加记录数JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });
回答by jkgeyti
The following might not be entirely correct, but it should give you some idea of how to juggle data. It's not pretty, should be replaced with case classes etc, but as a quick example of how to use the spark api, I hope it's enough :)
以下内容可能不完全正确,但它应该能让您了解如何处理数据。它不漂亮,应该用 case 类等替换,但作为如何使用 spark api 的快速示例,我希望它足够了 :)
val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
.map(_.split(",") /*or use a proper CSV parser*/
.map( Employee(row(0), row(1), row(2), row(3) )
# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))
val results = keyVals.reduceByKey{ a,b =>
(a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}
#debug output
results.take(100).foreach(println)
results
.map( keyval => someThingToFormatAsCsvStringOrWhatever )
.saveAsTextFile("hdfs://.../results")
Or you can use SparkSQL:
或者你可以使用 SparkSQL:
val sqlContext = new SQLContext(sparkContext)
# case classes can easily be registered as tables
employees.registerAsTable("employees")
val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*)
from employees
group by dep,des,state"""
回答by yhuai
For JSON, if your text file contains one JSON object per line, you can use sqlContext.jsonFile(path)
to let Spark SQL load it as a SchemaRDD
(the schema will be automatically inferred). Then, you can register it as a table and query it with SQL. You can also manually load the text file as an RDD[String]
containing one JSON object per record and use sqlContext.jsonRDD(rdd)
to turn it as a SchemaRDD
. jsonRDD
is useful when you need to pre-process your data.
对于 JSON,如果您的文本文件每行包含一个 JSON 对象,您可以使用sqlContext.jsonFile(path)
让 Spark SQL 将其加载为SchemaRDD
(将自动推断架构)。然后,您可以将其注册为表并使用 SQL 进行查询。您还可以手动将文本文件加载为RDD[String]
每条记录包含一个 JSON 对象的文件,并将sqlContext.jsonRDD(rdd)
其转换为SchemaRDD
. jsonRDD
当您需要预处理数据时非常有用。
回答by mrsrinivas
CSV file can be parsed with Spark built-in CSV reader. It will return DataFrame/DataSet on the successful read of the file. On top of DataFrame/DataSet, you apply SQL-like operations easily.
CSV 文件可以使用 Spark 内置的 CSV 阅读器进行解析。成功读取文件后,它将返回 DataFrame/DataSet。在 DataFrame/DataSet 之上,您可以轻松应用类似 SQL 的操作。
Using Spark 2.x(and above) with Java
在 Java 中使用 Spark 2.x(及更高版本)
Create SparkSession object aka spark
创建 SparkSession 对象又名 spark
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL Example")
.getOrCreate();
Create Schema for Row with StructType
为行创建架构 StructType
import org.apache.spark.sql.types.StructType;
StructType schema = new StructType()
.add("department", "string")
.add("designation", "string")
.add("ctc", "long")
.add("state", "string");
Create dataframe from CSV file and apply schema to it
从 CSV 文件创建数据框并将架构应用于它
Dataset<Row> df = spark.read()
.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("hdfs://path/input.csv");
more option on reading data from CSV file
Now we can aggregation on data in 2 ways
现在我们可以通过两种方式聚合数据
1. SQL way
Register a table in spark sql metastore to perform SQL operation
df.createOrReplaceTempView("employee");
Run SQL query on registered dataframe
Dataset<Row> sqlResult = spark.sql( "SELECT department, designation, state, SUM(ctc), COUNT(department)" + " FROM employee GROUP BY department, designation, state"); sqlResult.show(); //for testing
We can even execute SQL directly on CSV file with out creating table with Spark SQL
1.SQL方式
在spark sql metastore中注册一张表进行SQL操作
df.createOrReplaceTempView("employee");
在注册的数据帧上运行 SQL 查询
Dataset<Row> sqlResult = spark.sql( "SELECT department, designation, state, SUM(ctc), COUNT(department)" + " FROM employee GROUP BY department, designation, state"); sqlResult.show(); //for testing
2. Object chaining or Programming or Java-like way
Do the necessary import for sql functions
import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.sum;
Use
groupBy
andagg
on dataframe/dataset to performcount
andsum
on dataDataset<Row> dfResult = df.groupBy("department", "designation", "state") .agg(sum("ctc"), count("department")); // After Spark 1.6 columns mentioned in group by will be added to result by default dfResult.show();//for testing
2.对象链或编程或类Java的方式
为 sql 函数做必要的导入
import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.sum;
在数据帧/数据集上使用
groupBy
andagg
来执行count
和sum
处理数据Dataset<Row> dfResult = df.groupBy("department", "designation", "state") .agg(sum("ctc"), count("department")); // After Spark 1.6 columns mentioned in group by will be added to result by default dfResult.show();//for testing
dependent libraries
依赖库
"org.apache.spark" % "spark-core_2.11" % "2.0.0"
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"