Python 将 pyspark.sql.dataframe.DataFrame 类型的 Dataframe 转换为 Dictionary

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41206255/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 00:35:18  来源:igfitidea点击:

Convert pyspark.sql.dataframe.DataFrame type Dataframe to Dictionary

pythondictionaryapache-sparkpyspark

提问by Hardik Gupta

I have a pyspark Dataframe and I need to convert this into python dictionary.

我有一个 pyspark Dataframe,我需要将它转换为 python 字典。

Below code is reproducible:

下面的代码是可重现的:

from pyspark.sql import Row
rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)])
df = rdd.toDF()

Once I have this dataframe, I need to convert it into dictionary.

一旦我有了这个数据框,我需要将它转换成字典。

I tried like this

我试过这样

df.set_index('name').to_dict()

But it gives error. How can I achieve this

但它给出了错误。我怎样才能做到这一点

回答by mtoto

You need to first convert to a pandas.DataFrameusing toPandas(), then you can use the to_dict()method on the transposed dataframe with orient='list':

您需要首先转换为pandas.DataFrameusing toPandas(),然后您可以to_dict()在转置数据帧上使用该方法orient='list'

df.toPandas().set_index('name').T.to_dict('list')
# Out[1]: {u'Alice': [10, 80]}

回答by Fokko Driesprong

Please see the example below:

请看下面的例子:

>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
            .map(lambda line: line.split(","))
            .toDF(['name','age','height'])
            .select(col('name'), col('age').cast('int'), col('height').cast('int')))

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|  Bob|  5|    80|
|Alice| 10|    80|
+-----+---+------+

>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
    {'age': 5, 'name': u'Alice', 'height': 80}, 
    {'age': 5, 'name': u'Bob', 'height': 80}, 
    {'age': 10, 'name': u'Alice', 'height': 80}
]

>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}

The input that I'm using to test data.txt:

我用来测试的输入data.txt

Alice,5,80
Bob,5,80
Alice,10,80

First we do the loading by using pyspark by reading the lines. Then we convert the lines to columns by splitting on the comma. Then we convert the native RDD to a DF and add names to the colume. Finally we convert to columns to the appropriate format.

首先,我们通过读取行​​使用 pyspark 进行加载。然后我们通过在逗号上拆分来将行转换为列。然后我们将原生 RDD 转换为 DF 并将名称添加到 colume。最后,我们将列转换为适当的格式。

Then we collect everything to the driver, and using some python list comprehension we convert the data to the form as preferred. We convert the Rowobject to a dictionary using the asDict()method. In the output we can observe that Alice is appearing only once, but this is of course because the key of Alice gets overwritten.

然后我们将所有内容收集到驱动程序中,并使用一些 Python 列表推导式将数据转换为首选形式。我们Row使用asDict()方法将对象转换为字典。在输出中,我们可以观察到 Alice 只出现了一次,但这当然是因为 Alice 的密钥被覆盖了。

Please keep in mind that you want to do all the processing and filtering inside pypspark before returning the result to the driver.

请记住,在将结果返回给驱动程序之前,您希望在 pypspark 中进行所有处理和过滤。

Hope this helps, cheers.

希望这有帮助,干杯。

回答by Adam Ranganathan

RDDs have built in function asDict() that allows to represent each row as a dict.

RDD 内置了 asDict() 函数,它允许将每一行表示为一个 dict。

If you have a dataframe df, then you need to convert it to an rdd and apply asDict().

如果您有数据帧 df,则需要将其转换为 rdd 并应用 asDict()。

new_rdd = df.rdd.map(lambda row: row.asDict(True))

One can then use the new_rdd to perform normal python map operations like:

然后可以使用 new_rdd 执行正常的 python 映射操作,例如:

# You can define normal python functions like below and plug them when needed
def transform(row):
    # Add a new key to each row
    row["new_key"] = "my_new_value"
    return row

new_rdd = new_rdd.map(lambda row: transform(row))