如何处理 Spark 和 Scala 中的异常

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45894688/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:24:50  来源:igfitidea点击:

How to handle exceptions in Spark and Scala

scalaapache-sparkexception-handling

提问by LucieCBurgess

I'm trying to handle common exceptions in Spark, like a .map operation not working correctly on all elements of the data or a FileNotFound exception. I have read all the existing questions and the following two posts:

我正在尝试处理 Spark 中的常见异常,例如 .map 操作无法对数据的所有元素正常工作或 FileNotFound 异常。我已阅读所有现有问题和以下两个帖子:

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

I have tried a Try statement within the line attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
so it reads attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)

我在该行中尝试了一个 Try 语句,attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
因此它显示为attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)

But that won't compile; the compiler won't recognise the .toDF()statement later. I have also tried a Java-like Try { Catch {}} block but can't get the scope right; dfis then not returned. Does anyone know how to do this properly? Do I even need to handle these exceptions, as the Spark framework seems to deal with a FileNotFound exception already without me adding one. But I would like to generate an error with the number of fields in the schema if the input file has the wrong number of columns, for example.

但这不会编译;编译器稍后将无法识别该.toDF()语句。我也尝试过类似 Java 的 Try { Catch {}} 块,但无法获得正确的范围;df然后不返回。有谁知道如何正确地做到这一点?我什至需要处理这些异常,因为 Spark 框架似乎已经在处理 FileNotFound 异常,而我没有添加异常。但是,例如,如果输入文件的列数错误,我想生成一个错误,包括模式中的字段数。

Here's the code:

这是代码:

object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in        subsequent tests */
def createDataFrame(fileName: String): DataFrame = {

import spark.implicits._

//try {
val df = spark.sparkContext
  .textFile("/path/to/file" + fileName)
  .map(_.split("\t"))
//mHealth user is the case class which defines the data schema
  .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
        attributes(3).toDouble, attributes(4).toDouble,
        attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
        attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
        attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
        attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
        attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
        attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
        attributes(23).toInt))
  .toDF()
  .cache()
df
} catch {
    case ex: FileNotFoundException => println(s"File $fileName not found")
    case unknown: Exception => println(s"Unknown exception: $unknown")

}
}

All suggestions appreciated. Thanks!

所有建议表示赞赏。谢谢!

回答by Neeraj Malhotra

Another option would be to use Trytype in scala.

另一种选择是在 Scala 中使用Try类型。

For example:

例如:

def createDataFrame(fileName: String): Try[DataFrame] = {

try {
      //create dataframe df
      Success(df)
    } catch {
      case ex: FileNotFoundException => {
        println(s"File $fileName not found")
        Failure(ex)
      }
      case unknown: Exception => {
        println(s"Unknown exception: $unknown")
        Failure(unknown)
      }
    }
  }

Now, in the caller side, handle it like:

现在,在调用方,处理它:

createDataFrame("file1.csv") match {
  case Success(df) => {
    // proceed with your pipeline
  }
  case Failure(ex) => //handle exception
}

This is slightly better than using Option as caller would know the reason for failure and can handle better.

这比使用 Option 稍微好一点,因为调用者会知道失败的原因并且可以更好地处理。

回答by Raphael Roth

Either you let the Exception be thrown out of the createDataFramemethod (and handle it outside), or change the signature to return Option[DataFrame]:

要么让 Exception 从createDataFrame方法中抛出(并在外面处理它),要么将签名更改为 return Option[DataFrame]

  def createDataFrame(fileName: String): Option[DataFrame] = {

    import spark.implicits._

    try {
      val df = spark.sparkContext
        .textFile("/path/to/file" + fileName)
        .map(_.split("\t"))
        //mHealth user is the case class which defines the data schema
        .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
        attributes(3).toDouble, attributes(4).toDouble,
        attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
        attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
        attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
        attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
        attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
        attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
        attributes(23).toInt))
        .toDF()
        .cache()

      Some(df)
    } catch {
      case ex: FileNotFoundException => {
        println(s"File $fileName not found")
        None
      }
      case unknown: Exception => {
        println(s"Unknown exception: $unknown")
        None
      }
    }
  }

EDIT: on the caller-side of createDataFrame there are several patterns. If you are processing several filenames, you can e.g. do:

编辑:在 createDataFrame 的调用方有几种模式。如果您正在处理多个文件名,您可以执行以下操作:

 val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten

If you are working on a single filename, you can do:

如果您正在处理单个文件名,您可以执行以下操作:

createDataFrame("file1.csv") match {
  case Some(df) => {
    // proceed with your pipeline
    val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1))
  }
  case None => println("could not create dataframe")
}

回答by Rajiv Singh

apply try and catch block on dataframe columns:

在数据框列上应用 try 和 catch 块:

(try{$"credit.amount"} catch{case e:Exception=> lit(0)}).as("credit_amount")