如何处理 Spark 和 Scala 中的异常
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45894688/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to handle exceptions in Spark and Scala
提问by LucieCBurgess
I'm trying to handle common exceptions in Spark, like a .map operation not working correctly on all elements of the data or a FileNotFound exception. I have read all the existing questions and the following two posts:
我正在尝试处理 Spark 中的常见异常,例如 .map 操作无法对数据的所有元素正常工作或 FileNotFound 异常。我已阅读所有现有问题和以下两个帖子:
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark
I have tried a Try statement within the line attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
so it reads attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
我在该行中尝试了一个 Try 语句,attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
因此它显示为attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
But that won't compile; the compiler won't recognise the .toDF()statement later. I have also tried a Java-like Try { Catch {}} block but can't get the scope right; dfis then not returned. Does anyone know how to do this properly? Do I even need to handle these exceptions, as the Spark framework seems to deal with a FileNotFound exception already without me adding one. But I would like to generate an error with the number of fields in the schema if the input file has the wrong number of columns, for example.
但这不会编译;编译器稍后将无法识别该.toDF()语句。我也尝试过类似 Java 的 Try { Catch {}} 块,但无法获得正确的范围;df然后不返回。有谁知道如何正确地做到这一点?我什至需要处理这些异常,因为 Spark 框架似乎已经在处理 FileNotFound 异常,而我没有添加异常。但是,例如,如果输入文件的列数错误,我想生成一个错误,包括模式中的字段数。
Here's the code:
这是代码:
object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in subsequent tests */
def createDataFrame(fileName: String): DataFrame = {
import spark.implicits._
//try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
df
} catch {
case ex: FileNotFoundException => println(s"File $fileName not found")
case unknown: Exception => println(s"Unknown exception: $unknown")
}
}
All suggestions appreciated. Thanks!
所有建议表示赞赏。谢谢!
回答by Neeraj Malhotra
Another option would be to use Trytype in scala.
另一种选择是在 Scala 中使用Try类型。
For example:
例如:
def createDataFrame(fileName: String): Try[DataFrame] = {
try {
//create dataframe df
Success(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
Failure(ex)
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
Failure(unknown)
}
}
}
Now, in the caller side, handle it like:
现在,在调用方,处理它:
createDataFrame("file1.csv") match {
case Success(df) => {
// proceed with your pipeline
}
case Failure(ex) => //handle exception
}
This is slightly better than using Option as caller would know the reason for failure and can handle better.
这比使用 Option 稍微好一点,因为调用者会知道失败的原因并且可以更好地处理。
回答by Raphael Roth
Either you let the Exception be thrown out of the createDataFramemethod (and handle it outside), or change the signature to return Option[DataFrame]:
要么让 Exception 从createDataFrame方法中抛出(并在外面处理它),要么将签名更改为 return Option[DataFrame]:
def createDataFrame(fileName: String): Option[DataFrame] = {
import spark.implicits._
try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
Some(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
None
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
None
}
}
}
EDIT: on the caller-side of createDataFrame there are several patterns. If you are processing several filenames, you can e.g. do:
编辑:在 createDataFrame 的调用方有几种模式。如果您正在处理多个文件名,您可以执行以下操作:
val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten
If you are working on a single filename, you can do:
如果您正在处理单个文件名,您可以执行以下操作:
createDataFrame("file1.csv") match {
case Some(df) => {
// proceed with your pipeline
val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1))
}
case None => println("could not create dataframe")
}
回答by Rajiv Singh
apply try and catch block on dataframe columns:
在数据框列上应用 try 和 catch 块:
(try{$"credit.amount"} catch{case e:Exception=> lit(0)}).as("credit_amount")

