Python 在 spark 中更新数据框列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29109916/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Updating a dataframe column in spark
提问by Luke
Looking at the new spark dataframe api, it is unclear whether it is possible to modify dataframe columns.
查看新的spark dataframe api,不清楚是否可以修改dataframe列。
How would I go about changing a value in row x
column y
of a dataframe?
我怎么会去改变行的值x
列y
一个数据帧的?
In pandas
this would be df.ix[x,y] = new_value
在pandas
这将是df.ix[x,y] = new_value
Edit: Consolidating what was said below, you can't modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.
编辑:合并下面所说的内容,您不能修改现有数据帧,因为它是不可变的,但您可以返回一个具有所需修改的新数据帧。
If you just want to replace a value in a column based on a condition, like np.where
:
如果您只想根据条件替换列中的值,例如np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
If you want to perform some operation on a column and create a new column that is added to the dataframe:
如果要对列执行某些操作并创建添加到数据框中的新列:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
If you want the new column to have the same name as the old column, you could add the additional step:
如果您希望新列与旧列具有相同的名称,您可以添加额外的步骤:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
采纳答案by karlson
While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction
implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:
虽然您不能像这样修改列,但您可以对列进行操作并返回一个反映该更改的新 DataFrame。为此,您首先要创建一个UserDefinedFunction
实现要应用的操作,然后有选择地将该函数仅应用于目标列。在 Python 中:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
now has the same schema as old_df
(assuming that old_df.target_column
was of type StringType
as well) but all values in column target_column
will be new_value
.
new_df
现在具有与old_df
(假设它old_df.target_column
也是类型StringType
)相同的架构,但列中的所有值都target_column
将是new_value
.
回答by maasg
DataFrames
are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site. To change values, you will need to create a new DataFrame by transforming the original one either using the SQL-like DSL or RDD operations like map
.
DataFrames
基于 RDD。RDD 是不可变的结构,不允许在现场更新元素。要更改值,您需要通过使用类似 SQL 的 DSL 或 RDD 操作(例如map
.
A highly recommended slide deck: Introducing DataFrames in Spark for Large Scale Data Science.
强烈推荐的幻灯片:Introducing DataFrames in Spark for Large Scale Data Science。
回答by radek1st
Just as maasgsays you can create a new DataFrame from the result of a map applied to the old DataFrame. An example for a given DataFrame df
with two rows:
正如maasg所说,您可以根据应用于旧 DataFrame 的地图的结果创建新的 DataFrame。df
具有两行的给定 DataFrame 的示例:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Note that if the types of the columns change, you need to give it a correct schema instead of df.schema
. Check out the api of org.apache.spark.sql.Row
for available methods: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
请注意,如果列的类型发生变化,则需要为其提供正确的架构而不是df.schema
. 查看org.apache.spark.sql.Row
可用方法的 api :https: //spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[Update] Or using UDFs in Scala:
[更新] 或者在 Scala 中使用 UDF:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
and if the column name needs to stay the same you can rename it back:
如果列名需要保持不变,您可以将其重命名:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
回答by Paul
Commonly when updating a column, we want to map an old value to a new value. Here's a way to do that in pyspark without UDF's:
通常在更新列时,我们希望将旧值映射到新值。这是在没有 UDF 的 pyspark 中执行此操作的一种方法:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
回答by DHEERAJ
importing col, whenfrom pyspark.sql.functionsand updating fifth column to integer(0,1,2) based on the string(string a, string b, string c) into a new DataFrame.
导入col,当从pyspark.sql.functions并根据 string(string a, string b, string c) 将第五列更新为 integer(0,1,2) 到新的 DataFrame 中时。
from pyspark.sql.functions import col, when
data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))