string 使用 Scala 在 Apache Spark 中拆分字符串
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29820501/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Splitting strings in Apache Spark using Scala
提问by AngryPanda
I have a dataset, which contains lines in the format (tab separated):
我有一个数据集,其中包含以下格式的行(制表符分隔):
Title<\t>Text
Now for every word in Text
, I want to create a (Word,Title)
pair.
For instance:
现在,对于 中的每个单词Text
,我都想创建一(Word,Title)
对。例如:
ABC Hello World
gives me
给我
(Hello, ABC)
(World, ABC)
Using Scala, I wrote the following:
使用 Scala,我写了以下内容:
val file = sc.textFile("s3n://file.txt")
val title = file.map(line => line.split("\t")(0))
val wordtitle = file.map(line => (line.split("\t")(1).split(" ").map(word => (word, line.split("\t")(0)))))
But this gives me the following output:
但这给了我以下输出:
[Lscala.Tuple2;@2204b589
[Lscala.Tuple2;@632a46d1
[Lscala.Tuple2;@6c8f7633
[Lscala.Tuple2;@3e9945f3
[Lscala.Tuple2;@40bf74a0
[Lscala.Tuple2;@5981d595
[Lscala.Tuple2;@5aed571b
[Lscala.Tuple2;@13f1dc40
[Lscala.Tuple2;@6bb2f7fa
[Lscala.Tuple2;@32b67553
[Lscala.Tuple2;@68d0b627
[Lscala.Tuple2;@8493285
How do I solve this?
我该如何解决这个问题?
Further reading
进一步阅读
What I want to achieve is to count the number of Words
that occur in a Text
for a particular Title
.
我想实现的是张数Words
发生在一个Text
特定Title
。
The subsequent code that I have written is:
我写的后续代码是:
val wordcountperfile = file.map(line => (line.split("\t")(1).split(" ").flatMap(word => word), line.split("\t")(0))).map(word => (word, 1)).reduceByKey(_ + _)
But it does not work. Please feel free to give your inputs on this. Thanks!
但它不起作用。请随时对此提出您的意见。谢谢!
回答by Sarvesh Kumar Singh
So... In spark you work using distributed data structure called RDD. They provide functionality similar to scala's collection types.
所以...在 spark 中,您使用称为 RDD 的分布式数据结构工作。它们提供类似于 Scala 的集合类型的功能。
val fileRdd = sc.textFile("s3n://file.txt")
// RDD[ String ]
val splitRdd = fileRdd.map( line => line.split("\t") )
// RDD[ Array[ String ]
val yourRdd = splitRdd.flatMap( arr => {
val title = arr( 0 )
val text = arr( 1 )
val words = text.split( " " )
words.map( word => ( word, title ) )
} )
// RDD[ ( String, String ) ]
// Now, if you want to print this...
yourRdd.foreach( { case ( word, title ) => println( s"{ $word, $title }" ) } )
// if you want to count ( this count is for non-unique words),
val countRdd = yourRdd
.groupBy( { case ( word, title ) => title } ) // group by title
.map( { case ( title, iter ) => ( title, iter.size ) } ) // count for every title
回答by Shaido - Reinstate Monica
This is how it can be solved using the newer dataframe API. First read the data using "\t" as a delimiter:
这就是使用较新的 dataframe API可以解决的方法。首先使用“\t”作为分隔符读取数据:
val df = spark.read
.option("delimiter", "\t")
.option("header", false)
.csv("s3n://file.txt")
.toDF("title", "text")
Then, split
the text
column on space and explode
to get one word per row.
然后,split
在text
空间和列explode
得到每行一个字。
val df2 = df.select($"title", explode(split($"text", " ")).as("words"))
Finally, group on the title
column and count the number of words for each.
最后,在title
列上分组并计算每个的单词数。
val countDf = df2.groupBy($"title").agg(count($"words"))
回答by Zhen Zeng
Another version with DataFrame API
带有 DataFrame API 的另一个版本
// read into DataFrame
val viewsDF=spark.read.text("s3n://file.txt")
// Split
val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\t").getItem(0)).withColumn("col2", split($"value", "\s+").getItem(1)).drop($"value"))
Sample
样本
scala> val viewsDF=spark.read.text("spark-labs/data/wiki-pageviews.txt")
viewsDF: org.apache.spark.sql.DataFrame = [value: string]
scala> viewsDF.printSchema
root
|-- value: string (nullable = true)
scala> viewsDF.limit(5).show
+------------------+
| value|
+------------------+
| aa Main_Page 3 0|
| aa Main_page 1 0|
| aa User:Savh 1 0|
| aa Wikipedia 1 0|
|aa.b User:Savh 1 0|
+------------------+
scala> val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\s+").getItem(0)).withColumn("col2", split($"value", "\s+").getItem(1)).withColumn("col3", split($"value", "\s+").getItem(2)).drop($"value")
splitedViewsDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]
scala>
scala> splitedViewsDF.printSchema
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
scala> splitedViewsDF.limit(5).show
+----+---------+----+
|col1| col2|col3|
+----+---------+----+
| aa|Main_Page| 3|
| aa|Main_page| 1|
| aa|User:Savh| 1|
| aa|Wikipedia| 1|
|aa.b|User:Savh| 1|
+----+---------+----+
scala>
回答by Maor Aharon
The answer which proved above is not good enough.
.map( line => line.split("\t") )
may cause:
上面证明的答案还不够好。
.map( line => line.split("\t") )
可能会导致:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 1485, ip-172-31-113-181.us-west-2.compute.internal, executor 10): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 18.0 中的任务 0 失败 4 次,最近失败:阶段 18.0 中丢失任务 0.3(TID 1485,ip-172-31-113-181.us- west-2.compute.internal, executor 10): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14
in case the last column is empty. the best result explained here - Split 1 column into 3 columns in spark scala
如果最后一列是空的。此处解释的最佳结果 - 在 spark scala 中将 1 列拆分为 3 列