scala 如何检测 Spark DataFrame 是否有列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35904136/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:04:01  来源:igfitidea点击:

How do I detect if a Spark DataFrame has a column

scalaapache-sparkdataframeapache-spark-sql

提问by ben

When I create a DataFramefrom a JSON file in Spark SQL, how can I tell if a given column exists before calling .select

当我DataFrame从 Spark SQL 中的 JSON 文件创建 a 时,如何在调用之前判断给定列是否存在.select

Example JSON schema:

示例 JSON 架构:

{
  "a": {
    "b": 1,
    "c": 2
  }
}

This is what I want to do:

这就是我想要做的:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

but I can't find a good function for hasColumn. The closest I've gotten is to test if the column is in this somewhat awkward array:

但我找不到一个好的函数hasColumn。我得到的最接近的是测试列是否在这个有点尴尬的数组中:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)

回答by zero323

Just assume it exists and let it fail with Try. Plain and simple and supports an arbitrary nesting:

假设它存在并让它失败Try。简单明了,支持任意嵌套:

import scala.util.Try
import org.apache.spark.sql.DataFrame

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess

val df = sqlContext.read.json(sc.parallelize(
  """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))

hasColumn(df, "foobar")
// Boolean = false

hasColumn(df, "foo")
// Boolean = true

hasColumn(df, "foo.bar")
// Boolean = true

hasColumn(df, "foo.bar.foobar")
// Boolean = true

hasColumn(df, "foo.bar.foobaz")
// Boolean = false

Or even simpler:

或者更简单:

val columns = Seq(
  "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")

columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
//   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)

Python equivalent:

Python等价物:

from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()

has_column(df, "foobar")
## False

has_column(df, "foo")
## True

has_column(df, "foo.bar")
## True

has_column(df, "foo.bar.foobar")
## True

has_column(df, "foo.bar.foobaz")
## False

回答by Jai Prakash

Another option which I normally use is

我通常使用的另一个选项是

df.columns.contains("column-name-to-check")

This returns a boolean

这将返回一个布尔值

回答by Daniel B.

Actually you don't even need to call select in order to use columns, you can just call it on the dataframe itself

实际上你甚至不需要调用 select 来使用列,你可以在数据帧本身上调用它

// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)

// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)

// then you can just use it on the DF with a given column name
hasColumn(testDF, "a")  // <-- true
hasColumn(testDF, "c")  // <-- false

Alternatively you can define an implicit class using the pimp my library pattern so that the hasColumn method is available on your dataframes directly

或者,您可以使用 pimp my library 模式定义一个隐式类,以便 hasColumn 方法直接在您的数据帧上可用

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
    def hasColumn(colName: String) = df.columns.contains(colName)
}

Then you can use it as:

然后您可以将其用作:

testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false

回答by Nitin Mathur

Tryis not optimal as the it will evaluate the expression inside Trybefore it takes the decision.

Try不是最优的,因为它会Try在做出决定之前评估内部的表达式。

For large data sets, use the below in Scala:

对于大型数据集,请在以下内容中使用Scala

df.schema.fieldNames.contains("column_name")

回答by Michael Lloyd Lee mlk

Your other option for this would be to do some array manipulation (in this case an intersect) on the df.columnsand your potential_columns.

您对此另一个选项是做一些数组操作(在这种情况下,intersect上)df.columns和你的potential_columns

// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF

// The columns we want to extract
val potential_columns = Seq("b", "c", "d")

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show

Alas this will not work for you inner object scenario above. You will need to look at the schema for that.

唉,这对你上面的内部对象场景不起作用。您将需要查看架构。

I'm going to change your potential_columnsto fully qualified column names

我要把你改成potential_columns完全限定的列名

val potential_columns = Seq("a.b", "a.c", "a.d")

// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)

// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF

// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show

This only goes one level deep, so to make it generic you would have to do more work.

这只会深入一层,因此要使其通用,您必须做更多的工作。

回答by mfryar

For those who stumble across this looking for a Python solution, I use:

对于那些偶然发现 Python 解决方案的人,我使用:

if 'column_name_to_check' in df.columns:
    # do something

When I tried @Jai Prakash's answer of df.columns.contains('column-name-to-check')using Python, I got AttributeError: 'list' object has no attribute 'contains'.

当我尝试@Jai Prakashdf.columns.contains('column-name-to-check')使用 Python的回答时,我得到了AttributeError: 'list' object has no attribute 'contains'.

回答by Shaun Ryan

If you shred your json using a schema definition when you load it then you don't need to check for the column. if it's not in the json source it will appear as a null column.

如果您在加载 json 时使用架构定义将其切碎,则无需检查该列。如果它不在 json 源中,它将显示为空列。

        val schemaJson = """
  {
      "type": "struct",
      "fields": [
          {
            "name": field1
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": field2
            "type": "string",
            "nullable": true,
            "metadata": {}
          }
      ]
  }
        """
    val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]

    val djson = sqlContext.read
    .schema(schema )
    .option("badRecordsPath", readExceptionPath)
    .json(dataPath)

回答by Jie

In PySpark, df.columns gives you a list of columns in the dataframe, so "colName" in df.columns would return a True or False. Give a try on that. Good luck!

在 PySpark 中,df.columns 为您提供数据框中的列列表,因此 df.columns 中的“colName”将返回 True 或 False。试一试吧。祝你好运!

回答by user11349757

def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
  Try(df.select(colName)).isSuccess

Use the above mentioned function to check the existence of column including nested column name.

使用上述函数检查包含嵌套列名的列是否存在。