scala Spark:仅当路径存在时才读取文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45193825/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:22:42  来源:igfitidea点击:

Spark : Read file only if the path exists

scalaapache-sparkparquet

提问by Darshan Mehta

I am trying to read the files present at Sequenceof Paths in scala. Below is the sample (pseudo) code:

我正在尝试读取Sequencescala 中存在的文件。下面是示例(伪)代码:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)

Now, in the above sequence, some paths exist whereas some don't. Is there any way to ignore the missing paths while reading parquetfiles (to avoid org.apache.spark.sql.AnalysisException: Path does not exist)?

现在,在上述序列中,有些路径存在而有些则不存在。有没有办法在读取parquet文件时忽略丢失的路径(以避免org.apache.spark.sql.AnalysisException: Path does not exist)?

I have tried the below and it seems to be working, but then, I end up reading the same path twice which is something I would like to avoid doing:

我已经尝试了下面的方法,它似乎有效,但是,我最终读了两次相同的路径,这是我想避免做的事情:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)

I checked the optionsmethod for DataFrameReaderbut that does not seem to have any option that is similar to ignore_if_missing.

我检查了options方法,DataFrameReader但似乎没有任何类似于ignore_if_missing.

Also, these paths can be hdfsor s3(this Seqis passed as a method argument) and while reading, I don't know whether a path is s3or hdfsso can't use s3or hdfsspecific API to check the existence.

此外,这些路径可以是hdfss3(这Seq是作为一个方法参数传递),并一边读书,我不知道一个路径是s3hdfs因此无法使用s3hdfs特定的API,以检查是否存在。

回答by Assaf Mendelson

You can filter out the irrelevant files as in @Psidom's answer. In spark, the best way to do so is to use the internal spark hadoop configuration. Given that spark session variable is called "spark" you can do:

您可以像@Psidom 的回答一样过滤掉不相关的文件。在 spark 中,最好的方法是使用内部 spark hadoop 配置。鉴于 spark 会话变量称为“spark”,您可以执行以下操作:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = {
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)

回答by Psidom

How about filtering the pathsfirstly`:

如何过滤第一个paths`:

paths.filter(f => new java.io.File(f).exists)

For instance:

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists)
// res18: List[String] = List(/tmp)