scala 在 spark 中导入 TSV 文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/23977792/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Import TSV File in spark
提问by user3657361
I am new to spark so forgive me for asking a basic question. I'm trying to import my tsv file into spark but I'm not sure if its working.
我是 Spark 新手,所以请原谅我提出一个基本问题。我正在尝试将我的 tsv 文件导入 spark,但我不确定它是否有效。
val lines = sc.textFile("/home/cloudera/Desktop/Test/test.tsv
val split_lines = lines.map(_.split("\t"))
split_lines.first()
Everything seems to be working fine. Is there a way I can see if the tsv file has loaded properly?
一切似乎都运行良好。有没有办法可以查看 tsv 文件是否已正确加载?
SAMPLE DATA: (all using tabs as spaces)
样本数据:(全部使用制表符作为空格)
hastag 200904 24 blackcat
hastag 200908 1 oaddisco
hastag 200904 1 blah
hastag 200910 3 mydda
回答by maasg
So far, your code looks good to me. If you print that first line to the console, do you see the expected data?
到目前为止,您的代码对我来说看起来不错。如果您将第一行打印到控制台,您是否看到了预期的数据?
To explore the Spark API, the best thing to do is to use the Spark-shell, a Scala REPL enriched with Spark-specifics that builds a default Spark Context for you.
要探索 Spark API,最好的做法是使用 Spark-shell,这是一个 Scala REPL,丰富了 Spark 特定的特性,可为您构建默认的 Spark 上下文。
It will let you explore the data a lot easier.
它将让您更轻松地探索数据。
Here's an example loading ~65k lines csv file. Similar usecase to what you're doing, I guess:
这是一个加载 ~65k 行 csv 文件的示例。与您正在做的事情类似的用例,我想:
$><spark_dir>/bin/spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.0.0-SNAPSHOT
/_/
scala> val lines=sc.textFile("/home/user/playground/ts-data.csv")
lines: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
scala> val csv=lines.map(_.split(";"))
csv: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14
scala> csv.count
(... spark processing ...)
res0: Long = 67538
// let's have a look at the first record
scala> csv.first
14/06/01 12:22:17 INFO SparkContext: Starting job: first at <console>:17
14/06/01 12:22:17 INFO DAGScheduler: Got job 1 (first at <console>:17) with 1 output partitions (allowLocal=true)
14/06/01 12:22:17 INFO DAGScheduler: Final stage: Stage 1(first at <console>:17)
14/06/01 12:22:17 INFO DAGScheduler: Parents of final stage: List()
14/06/01 12:22:17 INFO DAGScheduler: Missing parents: List()
14/06/01 12:22:17 INFO DAGScheduler: Computing the requested partition locally
14/06/01 12:22:17 INFO HadoopRDD: Input split: file:/home/user/playground/ts-data.csv:0+1932934
14/06/01 12:22:17 INFO SparkContext: Job finished: first at <console>:17, took 0.003210457 s
res1: Array[String] = Array(20140127, 0000df, d063b4, ***, ***-Service,app180000m,49)
// groupby id - count unique
scala> csv.groupBy(_(4)).count
(... Spark processing ...)
res2: Long = 37668
// records per day
csv.map(record => record(0)->1).reduceByKey(_+_).collect
(... more Spark processing ...)
res8: Array[(String, Int)] = Array((20140117,1854), (20140120,2028), (20140124,3398), (20140131,6084), (20140122,5076), (20140128,8310), (20140123,8476), (20140127,1932), (20140130,8482), (20140129,8488), (20140118,5100), (20140109,3488), (20140110,4822))
* Edit using data added to the question *
* 使用添加到问题中的数据进行编辑 *
val rawData="""hastag 200904 24 blackcat
hastag 200908 1 oaddisco
hastag 200904 1 blah
hastag 200910 3 mydda"""
//split lines
val data= rawData.split("\n")
val rdd= sc.parallelize(data)
// Split using space as separator
val byId=rdd.map(_.split(" ")).groupBy(_(1))
byId.count
res11: Long = 3

