Python 将数据框列和外部列表传递给 withColumn 下的 udf

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37409857/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 19:21:44  来源:igfitidea点击:

Passing a data frame column and external list to udf under withColumn

pythonapache-sparkpysparkapache-spark-sqluser-defined-functions

提问by Jay

I have a Spark dataframe with following structure. The bodyText_token has the tokens (processed/set of words). And I have a nested list of defined keywords

我有一个具有以下结构的 Spark 数据框。bodyText_token 具有标记(已处理/一组单词)。我有一个已定义关键字的嵌套列表

root
 |-- id: string (nullable = true)
 |-- body: string (nullable = true)
 |-- bodyText_token: array (nullable = true)

keyword_list=['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]

I needed to check how many tokens fall under each keyword list and add the result as a new column of the existing dataframe. Eg: if tokens =["become", "farmer","rally","workers","student"]the result will be -> [1,2,0]

我需要检查每个关键字列表下有多少标记,并将结果添加为现有数据框的新列。例如:如果tokens =["become", "farmer","rally","workers","student"]结果是 -> [1,2,0]

The following function worked as expected.

以下功能按预期工作。

def label_maker_topic(tokens,topic_words):
    twt_list = []
    for i in range(0, len(topic_words)):
        count = 0
        #print(topic_words[i])
        for tkn in tokens:
            if tkn in topic_words[i]:
                count += 1
        twt_list.append(count)

    return twt_list

I used udf under withColumn to access the function and I get an error. I think it's about passing an external list to a udf. Is there a way I can pass external list and the datafram column to a udf and add a new column to my dataframe?

我在 withColumn 下使用 udf 来访问该函数,但出现错误。我认为这是将外部列表传递给 udf。有没有办法可以将外部列表和数据帧列传递给 udf 并将新列添加到我的数据帧?

topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))

回答by zero323

The cleanest solution is to pass additional arguments using closure:

最干净的解决方案是使用闭包传递额外的参数:

def make_topic_word(topic_words):
     return udf(lambda c: label_maker_topic(c, topic_words))

df = sc.parallelize([(["union"], )]).toDF(["tokens"])

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
    .show())

This doesn't require any changes in keyword_listor the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of setsfor efficient lookups.

这不需要keyword_list对 UDF 包装的函数或函数进行任何更改。您还可以使用此方法传递任意对象。这可用于传递例如sets用于高效查找的列表。

If you want to use your current UDF and pass topic_wordsdirectly you'll have to convert it to a column literal first:

如果要使用当前的 UDF 并topic_words直接传递,则必须先将其转换为列文字:

from pyspark.sql.functions import array, lit

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()

Depending on your data and requirements there can alternative, more efficient solutions, which don't require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).

根据您的数据和要求,可以有替代的、更有效的解决方案,不需要 UDF(爆炸 + 聚合 + 折叠)或查找(散列 + 向量操作)。

回答by Jay

The following works fine where any external parameter can be passed to the UDF (a tweaked code to help anyone)

以下工作正常,任何外部参数都可以传递给 UDF(调整后的代码以帮助任何人)

topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token))

回答by ravi malhotra

Just the other way using partial from functools module

只是使用 functools 模块中的 partial 的另一种方式

from functools import partial

func_to_call = partial(label_maker_topic, topic_words=keyword_list)

pyspark_udf = udf(func_to_call, <specify_the_type_returned_by_function_here>)

df = sc.parallelize([(["union"], )]).toDF(["tokens"])

df.withColumn("topics", pyspark_udf(col("tokens"))).show()