Python PySpark 逐行函数组合
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36584812/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
PySpark row-wise function composition
提问by Alex R.
As a simplified example, I have a dataframe "df" with columns "col1,col2" and I want to compute a row-wise maximum after applying a function to each column :
作为一个简化的例子,我有一个数据框“df”,列“col1,col2”,我想在对每列应用一个函数后计算行式最大值:
def f(x):
return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))
So if df:
所以如果 df:
col1 col2
1 2
3 0
Then
然后
df2:
df2:
col1 col2 result
1 2 3
3 0 4
The above doesn't seem to work and produces "Cannot evaluate expression: PythonUDF#f..."
以上似乎不起作用并产生“无法计算表达式:PythonUDF#f ...”
I'm absolutely positive "f_udf" works just fine on my table, and the main issue is with the max_udf.
我绝对肯定“f_udf”在我的桌子上工作得很好,主要问题在于 max_udf。
Without creating extra columns or using basic map/reduce, is there a way to do the above entirely using dataframes and udfs? How should I modify "max_udf"?
如果不创建额外的列或使用基本的 map/reduce,有没有办法完全使用数据帧和 udfs 来完成上述操作?我应该如何修改“max_udf”?
I've also tried:
我也试过:
max_udf=udf(max, IntegerType())
which produces the same error.
这会产生相同的错误。
I've also confirmed that the following works:
我还确认了以下工作:
df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))
Why is it that I can't do these in one go?
为什么我不能一次性完成这些?
I would like to see an answer that generalizes to any function "f_udf" and "max_udf."
我想看到一个可以推广到任何函数“f_udf”和“max_udf”的答案。
回答by Christoph H?sler
I had a similar problem and found the solution in the answer to this stackoverflow question
我有一个类似的问题,并在这个 stackoverflow 问题的答案中找到了解决方案
To pass multiple columns or a whole row to an UDF use a struct:
要将多列或整行传递给 UDF,请使用struct:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType
df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())
new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))
new_df.show()
returns:
返回:
+----+----+----------+
| a| b|null_count|
+----+----+----------+
|null|null| 2|
| 1|null| 1|
|null| 2| 1|
+----+----+----------+
回答by Mohan
UserDefinedFunction is throwing error while accepting UDFs as their arguments.
UserDefinedFunction 在接受 UDF 作为参数时抛出错误。
You can modify the max_udf like below to make it work.
您可以像下面那样修改 max_udf 以使其工作。
df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"])
max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType())
df2 = df.withColumn("result", max_udf(df.col1, df.col2))
Or
或者
def f_udf(x):
return (x + 1)
max_udf = udf(lambda x, y: max(x, y), IntegerType())
## f_udf=udf(f, IntegerType())
df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2)))
Note:
注意:
The second approach is valid if and only if internal functions (here f_udf
) generate valid SQL expressions.
当且仅当内部函数(此处f_udf
)生成有效的 SQL 表达式时,第二种方法才有效。
It works here because f_udf(df.col1)
and f_udf(df.col2)
are evaluated as Column<b'(col1 + 1)'>
and Column<b'(col2 + 1)'>
respectively, before being passed to max_udf
. It wouldn't work with arbitrary function.
它在这里工作是因为f_udf(df.col1)
和在传递给 之前分别f_udf(df.col2)
被评估为Column<b'(col1 + 1)'>
和。它不适用于任意功能。Column<b'(col2 + 1)'>
max_udf
It wouldn't work if we try for example something like this:
如果我们尝试这样的事情,它就行不通:
from math import exp
df.withColumn("result", max_udf(exp(df.col1), exp(df.col2)))
回答by prossblad
Below a useful code especially made to create any new column by simply calling a top-level business rule, completely isolated from the technical and heavy Spark's stuffs (no need to spend $ and to feel dependant of Databricks libraries anymore). My advice is, in your organization try to do things simply and cleanly in life, for the benefits of top-level data users:
下面是一个有用的代码,专门用于通过简单地调用顶级业务规则来创建任何新列,完全独立于技术和繁重的 Spark 的东西(无需花费 $ 并且感觉不再依赖 Databricks 库)。我的建议是,在您的组织中,为了顶级数据用户的利益,尝试在生活中简单而干净地做事:
def createColumnFromRule(df, columnName, ruleClass, ruleName, inputColumns=None, inputValues=None, columnType=None):
from pyspark.sql import functions as F
from pyspark.sql import types as T
def _getSparkClassType(shortType):
defaultSparkClassType = "StringType"
typesMapping = {
"bigint" : "LongType",
"binary" : "BinaryType",
"boolean" : "BooleanType",
"byte" : "ByteType",
"date" : "DateType",
"decimal" : "DecimalType",
"double" : "DoubleType",
"float" : "FloatType",
"int" : "IntegerType",
"integer" : "IntegerType",
"long" : "LongType",
"numeric" : "NumericType",
"string" : defaultSparkClassType,
"timestamp" : "TimestampType"
}
sparkClassType = None
try:
sparkClassType = typesMapping[shortType]
except:
sparkClassType = defaultSparkClassType
return sparkClassType
if (columnType != None): sparkClassType = _getSparkClassType(columnType)
else: sparkClassType = "StringType"
aUdf = eval("F.udf(ruleClass." + ruleName + ", T." + sparkClassType + "())")
columns = None
values = None
if (inputColumns != None): columns = F.struct([df[column] for column in inputColumns])
if (inputValues != None): values = F.struct([F.lit(value) for value in inputValues])
# Call the rule
if (inputColumns != None and inputValues != None): df = df.withColumn(columnName, aUdf(columns, values))
elif (inputColumns != None): df = df.withColumn(columnName, aUdf(columns, F.lit(None)))
elif (inputValues != None): df = df.withColumn(columnName, aUdf(F.lit(None), values))
# Create a Null column otherwise
else:
if (columnType != None):
df = df.withColumn(columnName, F.lit(None).cast(columnType))
else:
df = df.withColumn(columnName, F.lit(None))
# Return the resulting dataframe
return df
Usage example:
用法示例:
# Define your business rule (you can get columns and values)
class CustomerRisk:
def churnRisk(self, columns=None, values=None):
isChurnRisk = False
# ... Rule implementation starts here
if (values != None):
if (values[0] == "FORCE_CHURN=true"): isChurnRisk = True
if (isChurnRisk == False and columns != None):
if (columns["AGE"]) <= 25): isChurnRisk = True
# ...
return isChurnRisk
# Execute the rule, it will create your new column in one line of code, that's all, easy isn't ?
# And look how to pass columns and values, it's really easy !
df = createColumnFromRule(df, columnName="CHURN_RISK", ruleClass=CustomerRisk(), ruleName="churnRisk", columnType="boolean", inputColumns=["NAME", "AGE", "ADDRESS"], inputValues=["FORCE_CHURN=true", "CHURN_RISK=100%"])