Python 从 pyspark 中的数据帧构建 StructType

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36026070/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 17:18:26  来源:igfitidea点击:

Building a StructType from a dataframe in pyspark

pythonapache-sparkdataframepysparkapache-spark-sql

提问by learning

I am new spark and python and facing this difficulty of building a schema from a metadata file that can be applied to my data file. Scenario: Metadata File for the Data file(csv format), contains the columns and their types: for example:

我是新的 spark 和 python,面临着从可以应用于我的数据文件的元数据文件构建模式的困难。场景:数据文件的元数据文件(csv 格式),包含列及其类型:例如:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

I have successfully converted this to a dataframe that looks like:

我已成功将其转换为如下所示的数据框:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

But when I try to convert this to a StructField format using this

但是当我尝试使用它将其转换为 StructField 格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

OR

或者

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

And then later convert it to StructType, using

然后将其转换为 StructType,使用

schemaFinal = StructType(schemaList)

I get the following error:

我收到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

I am stuck on this due to my lack of knowledge on Data Frames, can you please advise, how to proceed on this. once I have schema ready I want to use createDataFrame to apply to my data File. This process has to be done for many tables so I do not want to hardcode the types rather use the metadata file to build the schema and then apply to the RDD.

由于我对数据帧缺乏了解,我被困在这个问题上,请您指教,如何继续。一旦我准备好架构,我想使用 createDataFrame 应用于我的数据文件。必须对许多表执行此过程,因此我不想对类型进行硬编码,而是使用元数据文件来构建模式,然后应用于 RDD。

Thanks in advance.

提前致谢。

回答by zero323

Fields have argument have to be a list of DataTypeobjects. This:

字段的参数必须是DataType对象列表。这个:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

generates after collecta listof listsof tuples(Rows) of DataType(list[list[tuple[DataType]]]) not to mention that nullableargument should be boolean not a string.

生成后collect一个listliststuplesRows的)DataTypelist[list[tuple[DataType]]])更不用说nullable参数应该是布尔值不是字符串。

Your second attempt:

你的第二次尝试:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

generates after collecta listof strobjects.

后,生成collect一个liststr对象。

Correct schema for the record you've shown should look more or less like this:

您显示的记录的正确架构应该或多或少如下所示:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

Although using distributed data structures for task like this is a serious overkill, not to mention inefficient, you can try to adjust your first solution as follows:

尽管对这样的任务使用分布式数据结构是一种严重的矫枉过正,更不用说效率低下,您可以尝试如下调整您的第一个解决方案:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

but it is not particularly safe (eval). It could be easier to build a schema from JSON / dictionary. Assuming you have function which maps from type description to canonical type name:

但不是特别安全(eval)。从 JSON/字典构建模式可能更容易。假设您具有从类型描述映射到规范类型名称的函数:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

You can build dictionary of following shape:

您可以构建以下形状的字典:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

and feed it to StructType.fromJson:

并将其喂给StructType.fromJson

StructType.fromJson(schema_dict)

回答by Andy Quiroz

val columns: Array[String] = df1.columns
val reorderedColumnNames: Array[String] = df2.columns //or do the reordering you want
val result: DataFrame = dataFrame.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)

回答by BigData-Guru

Below steps can be followed to change the Datatype Objects

可以按照以下步骤更改数据类型对象

data_schema=[
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
]



final_struct=StructType(fields=data_schema)

df=spark.read.json('/home/abcde/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)



df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)