scala 在 Apache Spark 1.3 中将一列附加到数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29483498/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Append a column to Data Frame in Apache Spark 1.3
提问by Oleg Shirokikh
Is it possible and what would be the most efficient neat method to add a column to Data Frame?
是否有可能以及将一列添加到 Data Frame 的最有效的简洁方法是什么?
More specifically, column may serve as Row IDs for the existing Data Frame.
更具体地说,列可以用作现有数据框的行 ID。
In a simplified case, reading from file and not tokenizing it, I can think of something as below (in Scala), but it completes with errors (at line 3), and anyways doesn't look like the best route possible:
在一个简化的情况下,从文件中读取而不是标记它,我可以想到以下内容(在 Scala 中),但它以错误完成(在第 3 行),无论如何看起来都不是最佳路线:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
采纳答案by Oleg Shirokikh
It's been a while since I posted the question and it seems that some other people would like to get an answer as well. Below is what I found.
我发布这个问题已经有一段时间了,似乎其他一些人也想得到答案。下面是我发现的。
So the original task was to append a column with row identificators (basically, a sequence 1 to numRows) to any given data frame, so the rows order/presence can be tracked (e.g. when you sample). This can be achieved by something along these lines:
所以最初的任务是将一个带有行标识符(基本上是一个序列1 to numRows)的列附加到任何给定的数据帧,以便可以跟踪行的顺序/存在(例如,当您采样时)。这可以通过以下方式实现:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Regarding the general case of appending any column to any data frame:
关于将任何列附加到任何数据框的一般情况:
The "closest" to this functionality in Spark API are withColumnand withColumnRenamed. According to Scala docs, the former Returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can operate on thisdata frame only, i.e. given two data frames df1and df2with column col:
Spark API 中withColumn与此功能“最接近”的是和withColumnRenamed。根据Scala 文档,前者通过添加列返回一个新的 DataFrame。在我看来,这有点令人困惑和不完整的定义。这两个函数都只能对this数据帧进行操作,即给定两个数据帧df1和df2列col:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
So unless you can manage to transform a column in an existing dataframe to the shape you need, you can't use withColumnor withColumnRenamedfor appending arbitrary columns (standalone or other data frames).
因此,除非您能够设法将现有数据框中的列转换为您需要的形状,withColumn否则您不能使用或withColumnRenamed附加任意列(独立或其他数据框)。
As it was commented above, the workaround solution may be to use a join- this would be pretty messy, although possible - attaching the unique keys like above with zipWithIndexto both data frames or columns might work. Although efficiency is ...
正如上面所评论的那样,解决方法可能是使用 a join- 这会非常混乱,尽管可能 - 将像上面这样的唯一键附加zipWithIndex到数据框或列可能会起作用。虽然效率...
It's clear that appending a column to the data frame is not an easy functionality for distributed environment and there may not be very efficient, neat method for that at all. But I think that it's still very important to have this core functionality available, even with performance warnings.
很明显,将一列附加到数据框对于分布式环境来说并不是一个简单的功能,而且可能根本没有非常有效、简洁的方法。但我认为即使有性能警告,让这个核心功能可用仍然非常重要。
回答by Tal Joffe
not sure if it works in spark 1.3 but in spark 1.5 I use withColumn:
不确定它是否适用于 spark 1.3,但在 spark 1.5 中我使用 withColumn:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
I use this when I need to use a value that is not related to existing columns of the dataframe
当我需要使用与数据框的现有列无关的值时,我会使用它
This is similar to @NehaM's answer but simpler
这类似于@NehaM 的答案,但更简单
回答by NehaM
I took help from above answer. However, I find it incomplete if we want to change a DataFrameand current APIs are little different in Spark 1.6.
zipWithIndex()returns a Tupleof (Row, Long)which contains each row and corresponding index. We can use it to create new Rowaccording to our need.
我从上面的答案中得到了帮助。但是,如果我们想更改 aDataFrame并且当前的 API 在Spark 1.6.
zipWithIndex()返回Tuple的(Row, Long)包含每一行和对应的索引。我们可以Row根据需要使用它来创建新的。
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
I hope this will be helpful.
我希望这会有所帮助。
回答by Ramesh Maharjan
You can use row_numberwith Window functionas below to get the distinct id for each rows in a dataframe.
您可以将row_number与Window 函数一起使用,如下所示,以获取数据帧中每一行的不同 ID。
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
You can also use monotonically_increasing_idfor the same as
您也可以使用monotonically_increasing_id相同的
df.withColumn("ID", monotonically_increasing_id())
And there are some other waystoo.
还有一些其他的方法。

