java 使用 sc.textFile 从子目录递归获取文件内容

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28817940/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 14:13:40  来源:igfitidea点击:

Recursively fetch file contents from subdirectories using sc.textFile

javaapache-spark

提问by javadba

It seems that SparkContext textFile expects only files to be present in the given directory location - it does not either

SparkContext textFile 似乎只希望文件出现在给定的目录位置 - 它也没有

  • (a) recurse or
  • (b) even supportdirectories (tries to read directories as files)
  • (a) 递归或
  • (b) 甚至支持目录(尝试将目录作为文件读取)

Any suggestion how to structure a recursion - potentially simpler than creating the recursive file list / descent logic manually?

任何建议如何构建递归 - 可能比手动创建递归文件列表/下降逻辑更简单?

Here is the use case: files under

这是用例:下的文件

/data/tables/my_table

/数据/表/my_table

I want to be able to read via an hdfs call all the files at all directory levels under that parent directory.

我希望能够通过 hdfs 调用读取该父目录下所有目录级别的所有文件。

UPDATE

更新

The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending:

sc.textFile() 通过(子类)TextInputFormat 调用 Hadoop FileInputFormat。在逻辑内部确实存在进行递归目录读取 - 即首先检测条目是否是目录,如果是,则降序:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }

However when invoking sc.textFile there are errors on directory entries: "not a file". This behavior is confusing - given the proper support appears to be in place for handling directories.

但是,在调用 sc.textFile 时,目录条目会出现错误:“不是文件”。这种行为令人困惑 - 鉴于处理目录似乎有适当的支持。

回答by javadba

I was looking at an old version of FileInputFormat..

我在看旧版本的 FileInputFormat ..

BEFOREsetting the recursive config mapreduce.input.fileinputformat.input.dir.recursive

设置递归配置mapreduce.input.fileinputformat.input.dir.recursive 之前

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build

The default is null/not set which is evaluated as "false":

默认值为空/未设置,被评估为“假”:

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null

AFTER:

后:

Now set the value :

现在设置值:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

Now retry the recursive operation:

现在重试递归操作:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.

Updateadded /for full recursion per comment by @Ben

@Ben 为每条评论 添加的更新/完整递归

回答by Paul

I have found that these parameters must be set in the following way:

我发现必须按以下方式设置这些参数:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")