scala 如何将 csv 文件转换为 rdd
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24299427/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I convert csv file to rdd
提问by Ramya
I'm new to spark. I want to perform some operations on particular data in a CSV record.
我是新来的火花。我想对 CSV 记录中的特定数据执行一些操作。
I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.
我正在尝试读取 CSV 文件并将其转换为 RDD。我的进一步操作基于 CSV 文件中提供的标题。
(From comments) This is my code so far:
(来自评论)这是我目前的代码:
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
I can get the header values like this. I want to map this to each record in CSV file.
我可以得到这样的标头值。我想将其映射到 CSV 文件中的每条记录。
final String[] header=heading.split(" ");
I can get the header values like this. I want to map this to each record in CSV file.
我可以得到这样的标头值。我想将其映射到 CSV 文件中的每条记录。
In java I'm using CSVReader record.getColumnValue(Column header)to get the particular value. I need to do something similar to that here.
在 Java 中,我CSVReader record.getColumnValue(Column header)用来获取特定值。我需要在这里做类似的事情。
回答by maasg
A simplistic approach would be to have a way to preserve the header.
一种简单的方法是有一种方法来保留标题。
Let's say you have a file.csv like:
假设您有一个 file.csv,例如:
user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1
We can define a header class that uses a parsed version of the first row:
我们可以定义一个标题类,它使用第一行的解析版本:
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
That we can use that header to address the data further down the road:
我们可以使用该标头来进一步处理数据:
val csv = sc.textFile("file.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
Note that the headeris not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)
请注意,这header只不过是一个简单的助记符到数组索引的映射。几乎所有这些都可以在数组中元素的序数位置完成,例如user = row(0)
PS: Welcome to Scala :-)
PS:欢迎来到 Scala :-)
回答by Saman
You can use the spark-csv library: https://github.com/databricks/spark-csv
您可以使用 spark-csv 库:https: //github.com/databricks/spark-csv
This is directly from the documentation:
这直接来自文档:
import org.apache.spark.sql.SQLContext
SQLContext sqlContext = new SQLContext(sc);
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");
DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
回答by samthebest
Firstly I must say that it's much much simpler if you put your headers in separate files - this is the convention in big data.
首先,我必须说,如果将标题放在单独的文件中会简单得多 - 这是大数据中的惯例。
Anyway Daniel's answer is pretty good, but it has an inefficiency and a bug, so I'm going to post my own. The inefficiency is that you don't need to check every record to see if it's the header, you just need to check the first record for each partition. The bug is that by using .split(",")you could get an exception thrown or get the wrong column when entries are the empty string and occur at the start or end of the record - to correct that you need to use .split(",", -1). So here is the full code:
无论如何,Daniel 的答案非常好,但它效率低下且存在错误,因此我将发布自己的答案。低效率是你不需要检查每条记录看它是否是头,你只需要检查每个分区的第一条记录。错误在于,.split(",")当条目为空字符串并出现在记录的开头或结尾时,通过使用您可能会抛出异常或获取错误的列 - 以更正您需要使用的.split(",", -1). 所以这里是完整的代码:
val header =
scala.io.Source.fromInputStream(
hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
.open(new hadoop.fs.Path(path)))
.getLines.head
val columnIndex = header.split(",").indexOf(columnName)
sc.textFile(path).mapPartitions(iterator => {
val head = iterator.next()
if (head == header) iterator else Iterator(head) ++ iterator
})
.map(_.split(",", -1)(columnIndex))
Final points, consider Parquet if you want to only fish out certain columns. Or at least consider implementing a lazily evaluated split function if you have wide rows.
最后一点,如果您只想剔除某些列,请考虑 Parquet。或者,如果您有很宽的行,至少可以考虑实现一个延迟评估的拆分函数。
回答by Ajay Gupta
We can use the new DataFrameRDD for reading and writing the CSV data. There are few advantages of DataFrameRDD over NormalRDD:
我们可以使用新的 DataFrameRDD 来读取和写入 CSV 数据。DataFrameRDD 与 NormalRDD 相比有几个优点:
- DataFrameRDD are bit more faster than NormalRDD since we determine the schema and which helps to optimize a lot on runtime and provide us with significant performance gain.
- Even if the column shifts in CSV it will automatically take the correct column as we are not hard coding the column number which was present in reading the data as textFile and then splitting it and then using the number of column to get the data.
- In few lines of code you can read the CSV file directly.
- DataFrameRDD 比 NormalRDD 快一点,因为我们确定了架构,这有助于在运行时进行大量优化,并为我们提供显着的性能提升。
- 即使列在 CSV 中移动,它也会自动采用正确的列,因为我们没有硬编码在读取数据时出现的列号作为 textFile 然后拆分它,然后使用列数来获取数据。
- 只需几行代码,您就可以直接读取 CSV 文件。
You will be required to have this library: Add it in build.sbt
您将需要拥有这个库:在 build.sbt 中添加它
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Spark Scala code for it:
Spark Scala 代码:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
To convert to normal RDD by taking some of the columns from it and
通过从中取出一些列来转换为普通 RDD 和
val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Saving the RDD to CSV format:
将 RDD 保存为 CSV 格式:
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Since the header is set to true we will be getting the header name in all the output files.
由于标头设置为 true,我们将在所有输出文件中获取标头名称。
回答by cmd
Here is another example using Spark/Scala to convert a CSV to RDD. For a more detailed description see this post.
这是另一个使用 Spark/Scala将 CSV 转换为 RDD 的示例。有关更详细的说明,请参阅此帖子。
def main(args: Array[String]): Unit = {
val csv = sc.textFile("/path/to/your/file.csv")
// split / clean data
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
// get header
val header = headerAndRows.first
// filter out header (eh. just check if the first val matches the first header name)
val data = headerAndRows.filter(_(0) != header(0))
// splits to map (header/value pairs)
val maps = data.map(splits => header.zip(splits).toMap)
// filter out the user "me"
val result = maps.filter(map => map("user") != "me")
// print result
result.foreach(println)
}
回答by Daniel Darabos
I'd recommend reading the header directly from the driver, not through Spark. Two reasons for this: 1) It's a single line. There's no advantage to a distributed approach. 2) We need this line in the driver, not the worker nodes.
我建议直接从驱动程序读取标头,而不是通过 Spark。这样做的两个原因:1)它是单行。分布式方法没有优势。2)我们在驱动程序中需要这一行,而不是工作节点。
It goes something like this:
它是这样的:
// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head
Now when you make the RDD you can discard the header.
现在,当您制作 RDD 时,您可以丢弃标头。
val csvRDD = sc.textFile(filename).filter(_ != header)
Then we can make an RDD from one column, for example:
然后我们可以从一列中创建一个 RDD,例如:
val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))
回答by mightymephisto
Another alternative is to use the mapPartitionsWithIndexmethod as you'll get the partition index number and a list of all lines within that partition. Partition 0 and line 0 will be be the header
另一种选择是使用该mapPartitionsWithIndex方法,因为您将获得分区索引号和该分区内所有行的列表。分区 0 和第 0 行将是标题
val rows = sc.textFile(path)
.mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) =>
val results = new ArrayBuffer[(String, Int)]
var first = true
while (rows.hasNext) {
// check for first line
if (index == 0 && first) {
first = false
rows.next // skip the first row
} else {
results += rows.next
}
}
results.toIterator
}, true)
rows.flatMap { row => row.split(",") }
回答by om-nom-nom
How about this?
这个怎么样?
val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))
回答by Bruce Nelson
For spark scala I typically use when I can't use the spark csv packages...
对于 spark scala,我通常在无法使用 spark csv 包时使用...
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))
回答by hayden.sikh
As of Spark 2.0, CSV can be read directly into a DataFrame.
从 Spark 2.0 开始,CSV 可以直接读入DataFrame.
If the data file does not have a header row, then it would be:
如果数据文件没有标题行,那么它将是:
val df = spark.read.csv("file://path/to/data.csv")
That will load the data, but give each column generic names like _c0, _c1, etc.
这将加载数据,但给每列的通用名称一样_c0,_c1等等。
If there are headers then adding .option("header", "true")will use the first row to define the columns in the DataFrame:
如果有标题,则添加.option("header", "true")将使用第一行来定义 中的列DataFrame:
val df = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
For a concrete example, let's say you have a file with the contents:
举一个具体的例子,假设你有一个包含以下内容的文件:
user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1
Then the following will get the total hits grouped by topic:
然后以下将获得按主题分组的总点击量:
import org.apache.spark.sql.functions._
import spark.implicits._
val rawData = spark.read
.option("header", "true")
.csv("file://path/to/data.csv")
// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))
// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit
// into the master node's memory!
val collected = grouped.collect
// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
// be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")
// runs the query, writing the result back out
// in this case, in CSV format with a header and
// coalesced to a single file. This is easier for human
// consumption but usually much slower.
grouped.coalesce(1)
.write
.option("header", "true")
.csv("hdfs://some/output/directory/")

