scala 在spark scala中将1列拆分为3列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39255973/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:35:25  来源:igfitidea点击:

Split 1 column into 3 columns in spark scala

scalaapache-spark

提问by Matt Maurer

I have a dataframe in Spark using scala that has a column that I need split.

我在 Spark 中有一个使用 Scala 的数据框,它有一个我需要拆分的列。

scala> test.show
+-------------+
|columnToSplit|
+-------------+
|        a.b.c|
|        d.e.f|
+-------------+

I need this column split out to look like this:

我需要将此列拆分为如下所示:

+--------------+
|col1|col2|col3|
|   a|   b|   c|
|   d|   e|   f|
+--------------+

I'm using Spark 2.0.0

我正在使用 Spark 2.0.0

Thanks

谢谢

回答by

Try:

尝试:

import sparkObject.spark.implicits._
import org.apache.spark.sql.functions.split

df.withColumn("_tmp", split($"columnToSplit", "\.")).select(
  $"_tmp".getItem(0).as("col1"),
  $"_tmp".getItem(1).as("col2"),
  $"_tmp".getItem(2).as("col3")
)

The important point to note here is that the sparkObjectis the SparkSession object you might have already initialized. So, the (1) import statement has to be compulsorily put inline within the code, not before the class definition.

这里要注意的重点sparkObject是您可能已经初始化的 SparkSession 对象。因此,(1)import 语句必须强制内联在代码中,而不是在类定义之前。

回答by Psidom

To do this programmatically, you can create a sequence of expressions with (0 until 3).map(i => col("temp").getItem(i).as(s"col$i"))(assume you need 3 columns as result) and then apply it to selectwith : _*syntax:

要以编程方式执行此操作,您可以创建一个表达式序列(0 until 3).map(i => col("temp").getItem(i).as(s"col$i"))(假设您需要 3 列作为结果),然后将其应用于selectwith: _*语法:

df.withColumn("temp", split(col("columnToSplit"), "\.")).select(
    (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+


To keep all columns:

要保留所有列:

df.withColumn("temp", split(col("columnToSplit"), "\.")).select(
    col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+-------------+---------+----+----+----+
|columnToSplit|     temp|col0|col1|col2|
+-------------+---------+----+----+----+
|        a.b.c|[a, b, c]|   a|   b|   c|
|        d.e.f|[d, e, f]|   d|   e|   f|
+-------------+---------+----+----+----+


If you are using pyspark, use a list comprehension to replace the mapin scala:

如果您正在使用pyspark,请使用列表理解来替换mapin Scala:

df = spark.createDataFrame([['a.b.c'], ['d.e.f']], ['columnToSplit'])
from pyspark.sql.functions import col, split

(df.withColumn('temp', split('columnToSplit', '\.'))
   .select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(3))
).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+

回答by Sascha Vetter

A solution which avoids the select part. This is helpful when you just want to append the new columns:

避免选择部分的解决方案。当您只想追加新列时,这很有用:

case class Message(others: String, text: String)

val r1 = Message("foo1", "a.b.c")
val r2 = Message("foo2", "d.e.f")

val records = Seq(r1, r2)
val df = spark.createDataFrame(records)

df.withColumn("col1", split(col("text"), "\.").getItem(0))
  .withColumn("col2", split(col("text"), "\.").getItem(1))
  .withColumn("col3", split(col("text"), "\.").getItem(2))
  .show(false)

+------+-----+----+----+----+
|others|text |col1|col2|col3|
+------+-----+----+----+----+
|foo1  |a.b.c|a   |b   |c   |
|foo2  |d.e.f|d   |e   |f   |
+------+-----+----+----+----+

Update:I highly recommend to use Psidom's implementationto avoid splitting three times.

更新:我强烈建议使用Psidom 的实现来避免拆分三遍。

回答by soaptree

This appends columns to the original DataFrame and doesn't use select, and only splits once using a temporary column:

这会将列附加到原始 DataFrame 并且不使用select,并且仅使用临时列拆分一次:

import spark.implicits._

df.withColumn("_tmp", split($"columnToSplit", "\."))
  .withColumn("col1", $"_tmp".getItem(0))
  .withColumn("col2", $"_tmp".getItem(1))
  .withColumn("col3", $"_tmp".getItem(2))
  .drop("_tmp")

回答by Powers

This expands on Psidom's answer and shows how to do the split dynamically, without hardcoding the number of columns. This answer runs a query to calculate the number of columns.

这扩展了 Psidom 的答案,并展示了如何动态进行拆分,而无需对列数进行硬编码。此答案运行查询以计算列数。

val df = Seq(
  "a.b.c",
  "d.e.f"
).toDF("my_str")
.withColumn("letters", split(col("my_str"), "\."))

val numCols = df
  .withColumn("letters_size", size($"letters"))
  .agg(max($"letters_size"))
  .head()
  .getInt(0)

df
  .select(
    (0 until numCols).map(i => $"letters".getItem(i).as(s"col$i")): _*
  )
  .show()