string 使用 Scala 在 Apache Spark 中拆分字符串

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29820501/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-08 16:22:51  来源:igfitidea点击:

Splitting strings in Apache Spark using Scala

stringscalaapache-spark

提问by AngryPanda

I have a dataset, which contains lines in the format (tab separated):

我有一个数据集,其中包含以下格式的行(制表符分隔):

Title<\t>Text

Now for every word in Text, I want to create a (Word,Title)pair. For instance:

现在,对于 中的每个单词Text,我都想创建一(Word,Title)对。例如:

ABC      Hello World

gives me

给我

(Hello, ABC)
(World, ABC)

Using Scala, I wrote the following:

使用 Scala,我写了以下内容:

val file = sc.textFile("s3n://file.txt")
val title = file.map(line => line.split("\t")(0))
val wordtitle = file.map(line => (line.split("\t")(1).split(" ").map(word => (word, line.split("\t")(0)))))

But this gives me the following output:

但这给了我以下输出:

[Lscala.Tuple2;@2204b589
[Lscala.Tuple2;@632a46d1
[Lscala.Tuple2;@6c8f7633
[Lscala.Tuple2;@3e9945f3
[Lscala.Tuple2;@40bf74a0
[Lscala.Tuple2;@5981d595
[Lscala.Tuple2;@5aed571b
[Lscala.Tuple2;@13f1dc40
[Lscala.Tuple2;@6bb2f7fa
[Lscala.Tuple2;@32b67553
[Lscala.Tuple2;@68d0b627
[Lscala.Tuple2;@8493285

How do I solve this?

我该如何解决这个问题?

Further reading

进一步阅读

What I want to achieve is to count the number of Wordsthat occur in a Textfor a particular Title.

我想实现的是张数Words发生在一个Text特定Title

The subsequent code that I have written is:

我写的后续代码是:

val wordcountperfile = file.map(line => (line.split("\t")(1).split(" ").flatMap(word => word), line.split("\t")(0))).map(word => (word, 1)).reduceByKey(_ + _)

But it does not work. Please feel free to give your inputs on this. Thanks!

但它不起作用。请随时对此提出您的意见。谢谢!

回答by Sarvesh Kumar Singh

So... In spark you work using distributed data structure called RDD. They provide functionality similar to scala's collection types.

所以...在 spark 中,您使用称为 RDD 的分布式数据结构工作。它们提供类似于 Scala 的集合类型的功能。

val fileRdd = sc.textFile("s3n://file.txt")
// RDD[ String ]

val splitRdd = fileRdd.map( line => line.split("\t") )
// RDD[ Array[ String ]

val yourRdd = splitRdd.flatMap( arr => {
  val title = arr( 0 )
  val text = arr( 1 )
  val words = text.split( " " )
  words.map( word => ( word, title ) )
} )
// RDD[ ( String, String ) ]

// Now, if you want to print this...
yourRdd.foreach( { case ( word, title ) => println( s"{ $word, $title }" ) } )

// if you want to count ( this count is for non-unique words), 
val countRdd = yourRdd
  .groupBy( { case ( word, title ) => title } )  // group by title
  .map( { case ( title, iter ) => ( title, iter.size ) } ) // count for every title

回答by Shaido - Reinstate Monica

This is how it can be solved using the newer dataframe API. First read the data using "\t" as a delimiter:

这就是使用较新的 dataframe API可以解决的方法。首先使用“\t”作为分隔符读取数据:

val df = spark.read
  .option("delimiter", "\t")
  .option("header", false)
  .csv("s3n://file.txt")
  .toDF("title", "text")

Then, splitthe textcolumn on space and explodeto get one word per row.

然后,splittext空间和列explode得到每行一个字。

val df2 = df.select($"title", explode(split($"text", " ")).as("words"))

Finally, group on the titlecolumn and count the number of words for each.

最后,在title列上分组并计算每个的单词数。

val countDf = df2.groupBy($"title").agg(count($"words"))

回答by Zhen Zeng

Another version with DataFrame API

带有 DataFrame API 的另一个版本

// read into DataFrame
val viewsDF=spark.read.text("s3n://file.txt")

// Split
val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\t").getItem(0)).withColumn("col2", split($"value", "\s+").getItem(1)).drop($"value"))

Sample

样本

scala> val viewsDF=spark.read.text("spark-labs/data/wiki-pageviews.txt")
viewsDF: org.apache.spark.sql.DataFrame = [value: string]

scala> viewsDF.printSchema
root
 |-- value: string (nullable = true)


scala> viewsDF.limit(5).show
+------------------+
|             value|
+------------------+
|  aa Main_Page 3 0|
|  aa Main_page 1 0|
|  aa User:Savh 1 0|
|  aa Wikipedia 1 0|
|aa.b User:Savh 1 0|
+------------------+


scala> val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\s+").getItem(0)).withColumn("col2", split($"value", "\s+").getItem(1)).withColumn("col3", split($"value", "\s+").getItem(2)).drop($"value")
splitedViewsDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]

scala>

scala> splitedViewsDF.printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)


scala> splitedViewsDF.limit(5).show
+----+---------+----+
|col1|     col2|col3|
+----+---------+----+
|  aa|Main_Page|   3|
|  aa|Main_page|   1|
|  aa|User:Savh|   1|
|  aa|Wikipedia|   1|
|aa.b|User:Savh|   1|
+----+---------+----+


scala>

回答by Maor Aharon

The answer which proved above is not good enough. .map( line => line.split("\t") )may cause:

上面证明的答案还不够好。 .map( line => line.split("\t") )可能会导致:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 1485, ip-172-31-113-181.us-west-2.compute.internal, executor 10): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

org.apache.spark.SparkException:作业因阶段失败而中止:阶段 18.0 中的任务 0 失败 4 次,最近失败:阶段 18.0 中丢失任务 0.3(TID 1485,ip-172-31-113-181.us- west-2.compute.internal, executor 10): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

in case the last column is empty. the best result explained here - Split 1 column into 3 columns in spark scala

如果最后一列是空的。此处解释的最佳结果 - 在 spark scala 中将 1 列拆分为 3 列