Python PySpark 使用来自字典的映射创建新列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42980704/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 22:21:39  来源:igfitidea点击:

PySpark create new column with mapping from a dict

pythonapache-sparkdictionarypysparkapache-spark-sql

提问by ad_s

Using Spark 1.6, I have a Spark DataFrame column(named let's say col1) with values A, B, C, DS, DNS, E, F, G and H and I want to create a new column (say col2) with the values from the dicthere below, how do I map this? (so f.i. 'A' needs to be mapped to 'S' etc..)

使用 Spark 1.6,我有一个带有值 A、B、C、DS、DNS、E、F、G 和 H的 Spark DataFrame column(假设命名为col1),我想col2使用dict下面的值创建一个新列(比如说),我该如何映射?(所以 fi 'A' 需要映射到 'S' 等等。)

dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

回答by zero323

Inefficient solution with UDF (version independent):

UDF 的低效解决方案(与版本无关):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, StringType())

df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
    'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 
    'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

df.withColumn("value", translate(mapping)("key"))

with the result:

结果:

+-------+-----+
|    key|value|
+-------+-----+
|     DS|    S|
|      G|   NS|
|INVALID| null|
+-------+-----+

Much more efficient (Spark >= 2.0, Spark < 3.0) is to create a MapTypeliteral:

更高效(Spark >= 2.0, Spark < 3.0)是创建一个MapType文字:

from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

df.withColumn("value", mapping_expr.getItem(col("key")))

with the same result:

结果相同:

+-------+-----+
|    key|value|
+-------+-----+
|     DS|    S|
|      G|   NS|
|INVALID| null|
+-------+-----+

but more efficient execution plan:

但更有效的执行计划:

== Physical Plan ==
*Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
+- Scan ExistingRDD[key#15]

compared to UDF version:

与UDF版本相比:

== Physical Plan ==
*Project [key#15, pythonUDF0#61 AS value#57]
+- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
   +- Scan ExistingRDD[key#15]

In Spark >= 3.0getItemshould be replaced with __getitem__([]), i.e:

Spark >= 3.0 中getItem应替换为__getitem__( []),即:

df.withColumn("value", mapping_expr[col("key")]).show()

回答by Haim Bendanan

Sounds like the simplest solution would be to use the replace function: http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

听起来最简单的解决方案是使用替换功能:http: //spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

mapping= {
        'A': '1',
        'B': '2'
    }
df2 = df.replace(to_replace=mapping, subset=['yourColName'])