scala Spark 合并数据帧与模式不匹配,无需额外的磁盘 IO
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39869084/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark merge dataframe with mismatching schemas without extra disk IO
提问by Havnar
I would like to merge 2 dataframes with (potentially) mismatching schemas
我想将 2 个数据帧与(可能)不匹配的模式合并
org.apache.spark.sql.DataFrame = [name: string, age: int, height: int]
org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> A.unionAll(B)
would result in :
会导致:
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 2 columns and the right has 3;
I would like to do this from within Spark.
However, the Spark docs only propose to write the whole 2 dataframes out to a directory and read them back in using spark.read.option("mergeSchema", "true").
我想在 Spark 中执行此操作。但是,Spark 文档仅建议将整个 2 个数据帧写入目录并使用spark.read.option("mergeSchema", "true").
So a union doesn't help me out, and neither does the documentation. I would like to keep this extra I/O out of my job if at all possible. Am I missing some undocumented info, or is it not possible (yet)?
所以工会对我没有帮助,文档也没有。如果可能的话,我希望将这个额外的 I/O 排除在我的工作之外。我是否遗漏了一些未记录的信息,还是不可能(还)?
回答by prudenko
You can append a null column to frame B and after union 2 frames:
您可以将空列附加到帧 B 和联合 2 帧之后:
import org.apache.spark.sql.functions._
val missingFields = A.schema.toSet.diff(B.schema.toSet)
var C: DataFrame = null
for (field <- missingFields){
C = A.withColumn(field.name, expr("null"));
}
A.unionAll(C)
回答by Gary Gauh
parquet schema merging is disabled by default, turn on this option by:
默认情况下禁用镶木地板模式合并,通过以下方式打开此选项:
(1) set global option: spark.sql.parquet.mergeSchema=true
(2) write code: sqlContext.read.option("mergeSchema", "true").parquet("my.parquet")
回答by conradlee
Here's a pyspark solution.
这是一个pyspark解决方案。
It assumes that if the merge can't take place because one dataframe is missing a column contained in the other, then the right thing is to add the missing column with null values.
它假设如果由于一个数据帧缺少另一个包含的列而无法进行合并,那么正确的做法是添加具有空值的缺失列。
On the other hand, if the merge can't take place because the two dataframes share a column with conflicting type or nullability, then the right thing is to raise a TypeError (because that's a conflict you probably want to know about).
另一方面,如果由于两个数据帧共享一个具有冲突类型或可为空性的列而无法进行合并,那么正确的做法是引发 TypeError(因为这是您可能想知道的冲突)。
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = right_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
return df_left.union(df_right)
回答by Haim Bendanan
Here is another solution for this. I used rdd union because dataFrame union operation doesnt support multiple dataFrames. Note - This should not be used to merge lot of dataFrames with different schema. The cost of adding null columns to dataFrames will result quickly in out of memory errors. (i.e: trying to merge 1000 dataFrames with 10 columns missing will result in 10,000 transformations) If your use case it to read a dataFrame from storage with different schema that is composed from multiple paths with different schemas, a much better option would be to have your data saved as parquet in the first place and then use the 'mergeSchema' option when reading the dataFrame.
这是另一个解决方案。我使用 rdd union 是因为 dataFrame union 操作不支持多个 dataFrame。注意 - 这不应用于合并具有不同架构的大量数据帧。向数据帧添加空列的成本将很快导致内存不足错误。(即:尝试合并 1000 个缺少 10 列的数据帧将导致 10,000 次转换)如果您的用例是从具有不同架构的存储中读取数据帧,该架构由具有不同架构的多个路径组成,则更好的选择是拥有您的数据首先保存为镶木地板,然后在读取数据帧时使用“mergeSchema”选项。
def unionDataFramesAndMergeSchema(spark, dfsList):
'''
This function can perform a union between x dataFrames with different schemas.
Non-existing columns will be filled with null.
Note: If a column exist in 2 dataFrames with different types, an exception will be thrown.
:example:
>>> df1 = spark.createDataFrame([
>>> {
>>> 'A': 1,
>>> 'B': 1,
>>> 'C': 1
>>> }])
>>> df2 = spark.createDataFrame([
>>> {
>>> 'A': 2,
>>> 'C': 2,
>>> 'DNew' : 2
>>> }])
>>> unionDataFramesAndMergeSchema(spark,[df1,df2]).show()
>>> +---+----+---+----+
>>> | A| B| C|DNew|
>>> +---+----+---+----+
>>> | 2|null| 2| 2|
>>> | 1| 1| 1|null|
>>> +---+----+---+----+
:param spark: The Spark session.
:param dfsList: A list of dataFrames.
:return: A union of all dataFrames, with schema merged.
'''
if len(dfsList) == 0:
raise ValueError("DataFrame list is empty.")
if len(dfsList) == 1:
logging.info("The list contains only one dataFrame, no need to perform union.")
return dfsList[0]
logging.info("Will perform union between {0} dataFrames...".format(len(dfsList)))
columnNamesAndTypes = {}
logging.info("Calculating unified column names and types...")
for df in dfsList:
for columnName, columnType in dict(df.dtypes).iteritems():
if columnNamesAndTypes.has_key(columnName) and columnNamesAndTypes[columnName] != columnType:
raise ValueError(
"column '{0}' exist in at least 2 dataFrames with different types ('{1}' and '{2}'"
.format(columnName, columnType, columnNamesAndTypes[columnName]))
columnNamesAndTypes[columnName] = columnType
logging.info("Unified column names and types: {0}".format(columnNamesAndTypes))
logging.info("Adding null columns in dataFrames if needed...")
newDfsList = []
for df in dfsList:
newDf = df
dfTypes = dict(df.dtypes)
for columnName, columnType in dict(columnNamesAndTypes).iteritems():
if not dfTypes.has_key(columnName):
# logging.info("Adding null column for '{0}'.".format(columnName))
newDf = newDf.withColumn(columnName, func.lit(None).cast(columnType))
newDfsList.append(newDf)
dfsWithOrderedColumnsList = [df.select(columnNamesAndTypes.keys()) for df in newDfsList]
logging.info("Performing a flat union between all dataFrames (as rdds)...")
allRdds = spark.sparkContext.union([df.rdd for df in dfsWithOrderedColumnsList])
return allRdds.toDF()
回答by Zilong Z
Thanks @conradlee! I modified your solution to allow union by adding casting and removing nullability check. It worked for me.
谢谢@conradlee!我修改了您的解决方案以通过添加强制转换和删除可空性检查来允许联合。它对我有用。
def harmonize_schemas_and_combine(df_left, df_right):
'''
df_left is the main df; we try to append the new df_right to it.
Need to do three things here:
1. Set other claim/clinical features to NULL
2. Align schemas (data types)
3. Align column orders
'''
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType) for f in df_left.schema)
right_fields = set((f.name, f.dataType) for f in df_right.schema)
# import pdb; pdb.set_trace() #pdb debugger
# I. First go over left-unique fields:
# For columns in the main df, but not in the new df: add it as Null
# For columns in both df but w/ different datatypes, use casting to keep them consistent w/ main df (Left)
for l_name, l_type in left_fields.difference(right_fields): #1. find what Left has, Right doesn't
if l_name in right_types: #2A. if column is in both, then something's off w/ the schema
r_type = right_types[l_name] #3. tell me what's this column's type in Right
df_right = df_right.withColumn(l_name,df_right[l_name].cast(l_type)) #4. keep them consistent w/ main df (Left)
print("Casting magic happened on column %s: Left type: %s, Right type: %s. Both are now: %s." % (l_name, l_type, r_type, l_type))
else: #2B. if Left column is not in Right, add a NULL column to Right df
df_right = df_right.withColumn(l_name, F.lit(None).cast(l_type))
# Make sure Right columns are in the same order of Left
df_right = df_right.select(df_left.columns)
return df_left.union(df_right)
回答by Ihor Konovalenko
If you read both data frames from storage files you can just use predefined schema:
如果您从存储文件中读取两个数据帧,您可以只使用预定义的模式:
val schemaForRead =
StructType(List(
StructField("userId", LongType,true),
StructField("dtEvent", LongType,true),
StructField("goodsId", LongType,true)
))
val dfA = spark.read.format("parquet").schema(schemaForRead).load("/tmp/file1.parquet")
val dfB = spark.read.format("parquet").schema(schemaForRead).load("/tmp/file2.parquet")
val dfC = dfA.union(dfB)
Note that schema in files file1and file2can be different and can differ form schemaForRead. If file1doesn't contain field from schemaForReaddataframe Awill have empty field with null's. If file contains additional field not presented in schemaForReaddataframe just wouldn't have it.
请注意,文件中的架构file1和file2可以不同,也可以不同的形式schemaForRead。如果file1不包含来自schemaForRead数据帧的字段,A则将有带有null's 的空字段。如果文件包含未出现在schemaForRead数据框中的附加字段,则不会有它。

