Python Apache Spark -- 将 UDF 的结果分配给多个数据框列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35322764/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Apache Spark -- Assign the result of UDF to multiple dataframe columns
提问by Everaldo Aguiar
I'm using pyspark, loading a large csv file into a dataframe with spark-csv, and as a pre-processing step I need to apply a variety of operations to the data available in one of the columns (that contains a json string). That will return X values, each of which needs to be stored in their own separate column.
我正在使用 pyspark,使用 spark-csv 将大型 csv 文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含 json 字符串)中可用的数据应用各种操作. 这将返回 X 个值,每个值都需要存储在它们自己单独的列中。
That functionality will be implemented in a UDF. However, I am not sure how to return a list of values from that UDF and feed these into individual columns. Below is a simple example:
该功能将在 UDF 中实现。但是,我不确定如何从该 UDF 返回值列表并将这些值输入到各个列中。下面是一个简单的例子:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
That produces the following:
这会产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
What would be the best way to store the two (in this example) values being returned by the udf on separate columns? Right now they are being typed as strings:
将 udf 返回的两个(在本例中)值存储在单独的列上的最佳方法是什么?现在它们被输入为字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
采纳答案by zero323
It is not possible to create multiple top level columns from a single UDF call but you can create a new struct
. It requires an UDF with specified returnType
:
无法从单个 UDF 调用创建多个顶级列,但您可以创建一个新的struct
. 它需要一个指定的 UDF returnType
:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
schema = StructType([
StructField("foo", FloatType(), False),
StructField("bar", FloatType(), False)
])
def udf_test(n):
return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))
test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])
foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
## |-- foobar: struct (nullable = true)
## | |-- foo: float (nullable = false)
## | |-- bar: float (nullable = false)
You further flatten the schema with simple select
:
您可以使用 simple 进一步扁平化架构select
:
foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+
See also Derive multiple columns from a single column in a Spark DataFrame
回答by Aditya Vikram Singh
you can use flatMap to get the column the desired dataframe in one go
您可以使用 flatMap 一次性获取所需数据框的列
df=df.withColumn('udf_results',udf)
df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)