scala 有没有办法为 Spark 数据帧添加额外的元数据?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32628845/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:36:57  来源:igfitidea点击:

Is there a way to add extra metadata for Spark dataframes?

scalaapache-sparkapache-spark-sql

提问by Martin Senne

Is it possible to add extra meta data to DataFrames?

是否可以向DataFrames添加额外的元数据?

Reason

原因

I have Spark DataFrames for which I need to keep extra information. Example: A DataFrame, for which I want to "remember" the highest used index in an Integer id column.

我有DataFrame需要保留额外信息的Spark 。示例: A DataFrame,我想“记住”整数 id 列中使用率最高的索引。

Current solution

当前解决方案

I use a separate DataFrameto store this information. Of course, keeping this information separately is tedious and error-prone.

我使用一个单独的DataFrame来存储这些信息。当然,单独保存这些信息既乏味又容易出错。

Is there a better solution to store such extra information on DataFrames?

有没有更好的解决方案来在DataFrames上存储这些额外的信息?

回答by chbrown

To expand and Scala-fy nealmcb's answer (the question was tagged scala, not python, so I don't think this answer will be off-topic or redundant), suppose you have a DataFrame:

为了扩展和 Scala-fy nealmcb 的答案(问题被标记为 scala,而不是 python,所以我认为这个答案不会偏离主题或多余),假设您有一个 DataFrame:

import org.apache.spark.sql
val df = sc.parallelize(Seq.fill(100) { scala.util.Random.nextInt() }).toDF("randInt")

And some way to get the max or whatever you want to memoize on the DataFrame:

以及一些获得最大值或任何你想在 DataFrame 上记忆的方法:

val randIntMax = df.rdd.map { case sql.Row(randInt: Int) => randInt }.reduce(math.max)

sql.types.Metadatacan only hold strings, booleans, some types of numbers, and other metadata structures. So we have to use a Long:

sql.types.Metadata只能保存字符串、布尔值、某些类型的数字和其他元数据结构。所以我们必须使用一个 Long:

val metadata = new sql.types.MetadataBuilder().putLong("columnMax", randIntMax).build()

DataFrame.withColumn() actually has an overload that permits supplying a metadata argument at the end, but it's inexplicably marked [private], so we just do what it does — use Column.as(alias, metadata):

DataFrame.withColumn() 实际上有一个重载,允许在最后提供元数据参数,但它莫名其妙地标记为 [private],所以我们只做它所做的 - 使用Column.as(alias, metadata)

val newColumn = df.col("randInt").as("randInt_withMax", metadata)
val dfWithMax = df.withColumn("randInt_withMax", newColumn)

dfWithMaxnow has (a column with) the metadata you want!

dfWithMax现在有(一列)你想要的元数据!

dfWithMax.schema.foreach(field => println(s"${field.name}: metadata=${field.metadata}"))
> randInt: metadata={}
> randInt_withMax: metadata={"columnMax":2094414111}

Or programmatically and type-safely (sort of; Metadata.getLong() and others do not return Option and may throw a "key not found" exception):

或者以编程方式和类型安全(有点;Metadata.getLong() 和其他人不返回 Option 并可能抛出“找不到密钥”异常):

dfWithMax.schema("randInt_withMax").metadata.getLong("columnMax")
> res29: Long = 209341992

Attaching the max to a column makes sense in your case, but in the general case of attaching metadata to a DataFrame and not a column in particular, it appears you'd have to take the wrapper route described by the other answers.

在您的情况下,将最大值附加到列是有意义的,但在将元数据附加到 DataFrame 而不是特别的列的一般情况下,您似乎必须采用其他答案描述的包装器路线。

回答by nealmcb

As of Spark 1.2, StructType schemas have a metadataattribute which can hold an arbitrary mapping / dictionary of information for each Column in a Dataframe. E.g. (when used with the separate spark-csvlibrary):

从 Spark 1.2 开始,StructType 模式有一个metadata属性,它可以为数据帧中的每个列保存一个任意映射/信息字典。例如(当与单独的spark-csv库一起使用时):

customSchema = StructType([
  StructField("cat_id", IntegerType(), True,
    {'description': "Unique id, primary key"}),
  StructField("cat_title", StringType(), True,
    {'description': "Name of the category, with underscores"}) ])

categoryDumpDF = (sqlContext.read.format('com.databricks.spark.csv')
 .options(header='false')
 .load(csvFilename, schema = customSchema) )

f = categoryDumpDF.schema.fields
["%s (%s): %s" % (t.name, t.dataType, t.metadata) for t in f]

["cat_id (IntegerType): {u'description': u'Unique id, primary key'}",
 "cat_title (StringType): {u'description': u'Name of the category, with underscores.'}"]

This was added in [SPARK-3569] Add metadata field to StructField - ASF JIRA, and designed for use in Machine Learning pipelines to track information about the features stored in columns, like categorical/continuous, number categories, category-to-index map. See the SPARK-3569: Add metadata field to StructFielddesign document.

这是在[SPARK-3569] 将元数据字段添加到 StructField - ASF JIRA 中添加的,旨在用于机器学习管道以跟踪有关列中存储的特征的信息,例如分类/连续、数字类别、类别到索引映射. 请参阅SPARK-3569:将元数据字段添加到 StructField设计文档。

I'd like to see this used more widely, e.g. for descriptions and documentation of columns, the unit of measurement used in the column, coordinate axis information, etc.

我希望看到它被更广泛地使用,例如用于列的描述和文档、列中使用的度量单位、坐标轴信息等。

Issues include how to appropriately preserve or manipulate the metadata information when the column is transformed, how to handle multiple sorts of metadata, how to make it all extensible, etc.

问题包括在转换列时如何适当地保留或操作元数据信息,如何处理多种元数据,如何使其全部可扩展等。

For the benefit of those thinking of expanding this functionality in Spark dataframes, I reference some analogous discussions around Pandas.

为了那些考虑在 Spark 数据帧中扩展此功能的人的利益,我参考了一些关于 Pandas 的类似讨论。

For example, see xray - bring the labeled data power of pandas to the physical scienceswhich supports metadata for labeled arrays.

例如,请参阅xray - 将 Pandas 的标记数据能力带入支持标记数组元数据的物理科学

And see the discussion of metadata for Pandas at Allow custom metadata to be attached to panel/df/series? · Issue #2485 · pydata/pandas.

请参阅允许自定义元数据附加到面板/df/系列?· 问题 #2485 · pydata/pandas

See also discussion related to units: ENH: unit of measurement / physical quantities · Issue #10349 · pydata/pandas

另见与单位相关的讨论:ENH:测量单位/物理量 · 问题 #10349 · pydata/pandas

回答by Niemand

If you want to have less tedious work, I think you can add an implicit conversion between DataFrame and your custom wrapper (haven't tested it yet though).

如果你想减少繁琐的工作,我认为你可以在 DataFrame 和你的自定义包装器之间添加一个隐式转换(虽然还没有测试过)。

   implicit class WrappedDataFrame(val df: DataFrame) {
        var metadata = scala.collection.mutable.Map[String, Long]()

        def addToMetaData(key: String, value: Long) {
           metadata += key -> value
        }
     ...[other methods you consider useful, getters, setters, whatever]...
      }

If the implicit wrapper is in DataFrame's scope, you can just use normal DataFrame as if it was your wrapper, ie.:

如果隐式包装器在 DataFrame 的范围内,您可以像使用包装器一样使用普通的 DataFrame,即:

df.addtoMetaData("size", 100)

This way also makes your metadata mutable, so you should not be forced to compute it only once and carry it around.

这种方式也使您的元数据可变,因此您不应被迫只计算一次并随身携带。

回答by Al M

I would store a wrapper around your dataframe. For example:

我会在你的数据框周围存储一个包装器。例如:

case class MyDFWrapper(dataFrame: DataFrame, metadata: Map[String, Long])
val maxIndex = df1.agg("index" ->"MAX").head.getLong(0)
MyDFWrapper(df1, Map("maxIndex" -> maxIndex))

回答by Azuaron

A lot of people saw the word "metadata" and went straight to "column metadata". This does not seem to be what you wanted, and was not what I wanted when I had a similar problem. Ultimately, the problem here is that a DataFrame is an immutable data structure that, whenever an operation is performed on it, the data passes on but the rest of the DataFrame does not. This means that you can't simply put a wrapper on it, because as soon as you perform an operation you've got a whole new DataFrame (potentially of a completely new type, especially with Scala/Spark's tendencies toward implicit conversions). Finally, if the DataFrame ever escapes its wrapper, there's no way to reconstruct the metadata from the DataFrame.

很多人看到“元数据”这个词,就直奔“列元数据”。这似乎不是您想要的,也不是我遇到类似问题时想要的。最终,这里的问题是 DataFrame 是一个不可变的数据结构,每当对其执行操作时,数据都会传递,但 DataFrame 的其余部分不会传递。这意味着你不能简单地在它上面放一个包装器,因为一旦你执行了一个操作,你就会得到一个全新的 DataFrame(可能是一个全新的类型,特别是 Scala/Spark 倾向于隐式转换)。最后,如果 DataFrame 逃脱了它的包装器,则无法从 DataFrame 重建元数据。

I had this problem in Spark Streaming, which focuses on RDDs (the underlying datastructure of the DataFrame as well) and came to one simple conclusion: the only place to store the metadata is in the name of the RDD. An RDD name is never used by the core Spark system except for reporting, so it's safe to repurpose it. Then, you can create your wrapper based on the RDD name, with an explicit conversion between anyDataFrame and your wrapper, complete with metadata.

我在 Spark Streaming 中遇到了这个问题,它专注于 RDD(也是 DataFrame 的底层数据结构)并得出一个简单的结论:唯一存储元数据的地方是 RDD 的名称。除了报告之外,核心 Spark 系统从不使用 RDD 名称,因此重新利用它是安全的。然后,您可以根据 RDD 名称创建包装器,在任何DataFrame 和包装器之间进行显式转换,并带有元数据。

Unfortunately, this does still leave you with the problem of immutability and new RDDs being created with every operation. The RDD name (our metadata field) is lost with each new RDD. That means you need a way to re-add the name to your new RDD. This can be solved by providing a method that takes a function as an argument. It can extract the metadata before the function, call the function and get the new RDD/DataFrame, then name it with the metadata:

不幸的是,这仍然会给你留下不变性的问题,并且每次操作都会创建新的 RDD。每个新的 RDD 都会丢失 RDD 名称(我们的元数据字段)。这意味着您需要一种将名称重新添加到新 RDD 的方法。这可以通过提供一种将函数作为参数的方法来解决。它可以在函数之前提取元数据,调用函数并获取新的RDD/DataFrame,然后用元数据命名:

def withMetadata(fn: (df: DataFrame) => DataFrame): MetaDataFrame = {
  val meta = df.rdd.name
  val result = fn(wrappedFrame)
  result.rdd.setName(meta)
  MetaDataFrame(result)
}

Your wrapping class (MetaDataFrame) can provide convenience methods for parsing and setting metadata values, as well as implicit conversions back and forth between Spark DataFrame and MetaDataFrame. As long as you run all your mutations through the withMetadata method, your metadata will carry along though your entire transformation pipeline. Using this method for every call is a bit of a hassle, yes, but the simple reality is that there is not a first-class metadata concept in Spark.

您的包装类 (MetaDataFrame) 可以提供方便的方法来解析和设置元数据值,以及在 Spark DataFrame 和 MetaDataFrame 之间来回隐式转换。只要您通过 withMetadata 方法运行所有更改,您的元数据就会通过整个转换管道进行。每次调用都使用这种方法有点麻烦,是的,但简单的现实是 Spark 中没有一流的元数据概念。