scala 在 Apache Spark 1.3 中将一列附加到数据帧

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29483498/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:03:15  来源:igfitidea点击:

Append a column to Data Frame in Apache Spark 1.3

scalaapache-sparkdataframe

提问by Oleg Shirokikh

Is it possible and what would be the most efficient neat method to add a column to Data Frame?

是否有可能以及将一列添加到 Data Frame 的最有效的简洁方法是什么?

More specifically, column may serve as Row IDs for the existing Data Frame.

更具体地说,列可以用作现有数据框的行 ID。

In a simplified case, reading from file and not tokenizing it, I can think of something as below (in Scala), but it completes with errors (at line 3), and anyways doesn't look like the best route possible:

在一个简化的情况下,从文件中读取而不是标记它,我可以想到以下内容(在 Scala 中),但它以错误完成(在第 3 行),无论如何看起来都不是最佳路线:

var dataDF = sc.textFile("path/file").toDF() 
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") 
dataDF = dataDF.withColumn("ID", rowDF("ID")) 

采纳答案by Oleg Shirokikh

It's been a while since I posted the question and it seems that some other people would like to get an answer as well. Below is what I found.

我发布这个问题已经有一段时间了,似乎其他一些人也想得到答案。下面是我发现的。

So the original task was to append a column with row identificators (basically, a sequence 1 to numRows) to any given data frame, so the rows order/presence can be tracked (e.g. when you sample). This can be achieved by something along these lines:

所以最初的任务是将一个带有行标识符(基本上是一个序列1 to numRows)的列附加到任何给定的数据帧,以便可以跟踪行的顺序/存在(例如,当您采样时)。这可以通过以下方式实现:

sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))

Regarding the general case of appending any column to any data frame:

关于将任何列附加到任何数据框的一般情况:

The "closest" to this functionality in Spark API are withColumnand withColumnRenamed. According to Scala docs, the former Returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can operate on thisdata frame only, i.e. given two data frames df1and df2with column col:

Spark API 中withColumn与此功能“最接近”的是和withColumnRenamed。根据Scala 文档,前者通过添加列返回一个新的 DataFrame。在我看来,这有点令人困惑和不完整的定义。这两个函数都只能对this数据帧进行操作,即给定两个数据帧df1df2col

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL

So unless you can manage to transform a column in an existing dataframe to the shape you need, you can't use withColumnor withColumnRenamedfor appending arbitrary columns (standalone or other data frames).

因此,除非您能够设法将现有数据框中的列转换为您需要的形状,withColumn否则您不能使用或withColumnRenamed附加任意列(独立或其他数据框)。

As it was commented above, the workaround solution may be to use a join- this would be pretty messy, although possible - attaching the unique keys like above with zipWithIndexto both data frames or columns might work. Although efficiency is ...

正如上面所评论的那样,解决方法可能是使用 a join- 这会非常混乱,尽管可能 - 将像上面这样的唯一键附加zipWithIndex到数据框或列可能会起作用。虽然效率...

It's clear that appending a column to the data frame is not an easy functionality for distributed environment and there may not be very efficient, neat method for that at all. But I think that it's still very important to have this core functionality available, even with performance warnings.

很明显,将一列附加到数据框对于分布式环境来说并不是一个简单的功能,而且可能根本没有非常有效、简洁的方法。但我认为即使有性能警告,让这个核心功能可用仍然非常重要。

回答by Tal Joffe

not sure if it works in spark 1.3 but in spark 1.5 I use withColumn:

不确定它是否适用于 spark 1.3,但在 spark 1.5 中我使用 withColumn:

import sqlContext.implicits._
import org.apache.spark.sql.functions._


df.withColumn("newName",lit("newValue"))

I use this when I need to use a value that is not related to existing columns of the dataframe

当我需要使用与数据框的现有列无关的值时,我会使用它

This is similar to @NehaM's answer but simpler

这类似于@NehaM 的答案,但更简单

回答by NehaM

I took help from above answer. However, I find it incomplete if we want to change a DataFrameand current APIs are little different in Spark 1.6. zipWithIndex()returns a Tupleof (Row, Long)which contains each row and corresponding index. We can use it to create new Rowaccording to our need.

我从上面的答案中得到了帮助。但是,如果我们想更改 aDataFrame并且当前的 API 在Spark 1.6. zipWithIndex()返回Tuple(Row, Long)包含每一行和对应的索引。我们可以Row根据需要使用它来创建新的。

val rdd = df.rdd.zipWithIndex()
             .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show

I hope this will be helpful.

我希望这会有所帮助。

回答by Ramesh Maharjan

You can use row_numberwith Window functionas below to get the distinct id for each rows in a dataframe.

您可以将row_numberWindow 函数一起使用,如下所示,以获取数据帧中每一行的不同 ID。

df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))

You can also use monotonically_increasing_idfor the same as

您也可以使用monotonically_increasing_id相同的

df.withColumn("ID", monotonically_increasing_id())

And there are some other waystoo.

还有一些其他的方法