java 从 Spark 中的压缩读取整个文本文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36604145/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 01:39:55  来源:igfitidea点击:

Read whole text files from a compression in Spark

javahadoopapache-sparkhdfscompression

提问by Belphegor

I have the following problem: suppose that I have a directory containing compressed directories which contain multiple files, stored on HDFS. I want to create an RDD consisting some objects of type T, i.e.:

我有以下问题:假设我有一个目录,其中包含存储在 HDFS 上的多个文件的压缩目录。我想创建一个包含一些 T 类型对象的 RDD,即:

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

Now when inputDataPathis a directory containing files this works perfectly fine, i.e. when it's something like:

现在什么时候inputDataPath是一个包含文件的目录,这工作得很好,即当它是这样的:

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

But, when there's a tgz containing multiple files, the file content (fileNameContent._2()) gets me some useless binary string (quite expected). I found a similar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a questionabout wholeTextFiles, but this doesn't work in my case.

但是,当有一个包含多个文件的 tgz 时,文件内容 ( fileNameContent._2()) 会给我一些无用的二进制字符串(非常预期)。我在 SO 上发现了一个类似的问题,但情况不同,因为解决方案是每个压缩仅包含一个文件,而在我的情况下,还有许多其他文件我想作为整个文件单独读取。我还发现了一个关于的问题wholeTextFiles,但这在我的情况下不起作用。

Any ideas how to do this?

任何想法如何做到这一点?

EDIT:

编辑:

I tried with the reader from here(trying to test the reader from here, like in the function testTarballWithFolders()), but whenever I call

我试图从读者在这里(试图从测试的读者在这里,就像在功能testTarballWithFolders()),但每当我打电话

TarballReader tarballReader = new TarballReader(fileName);

and I get NullPointerException:

我得到NullPointerException

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda
GZIPInputStream gzip = new GZIPInputStream(in);
(SparkMain.java:105) at main.SparkMain$$Lambda/1667100242.call(Unknown Source) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

The line 105 in MainSparkis the one I showed upper in my edit of the post, and line 61 from TarballReaderis

第 105 行MainSpark是我在帖子编辑中显示的那一行,第 61 行TarballReader

InputStream in = this.getClass().getResourceAsStream(tarball);

which gives a null value for the input stream inin the upper line:

它为in上面一行中的输入流提供了一个空值:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
  val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
  Stream.continually(Option(tar.getNextTarEntry))
    // Read until next exntry is null
    .takeWhile(_.isDefined)
    // flatten
    .flatMap(x => x)
    // Drop directories
    .filter(!_.isDirectory)
    .map(e => {
      Stream.continually {
        // Read n bytes
        val buffer = Array.fill[Byte](n)(-1)
        val i = tar.read(buffer, 0, n)
        (i, buffer.take(i))}
      // Take as long as we've read something
      .takeWhile(_._1 > 0)
      .map(_._2)
      .flatten
      .toArray})
    .toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
  new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x => 
  extractFiles(x).toOption).mapValues(_.map(decode()))

Am I on the right path here? If so, how do I continue? Why do I get this null value and how can I fix it?

我在正确的道路上吗?如果是这样,我该如何继续?为什么我会得到这个空值,我该如何修复它?

回答by zero323

One possible solution is to read data with binaryFilesand extract content manually.

一种可能的解决方案是binaryFiles手动读取数据和提取内容。

Scala:

斯卡拉

libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"
import tarfile
from io import BytesIO

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
    .mapValues(extractFiles)
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Java 的完整使用示例:https: //bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

蟒蛇

##代码##

回答by Neil Best

A slight improvement on the accepted answer is to change

对接受的答案略有改进是改变

Option(tar.getNextTarEntry)

Option(tar.getNextTarEntry)

to

Try(tar.getNextTarEntry).toOption.filter( _ != null)

Try(tar.getNextTarEntry).toOption.filter( _ != null)

to contend with malformed / truncated .tar.gzs in a robust way.

.tar.gz稳健的方式应对格式错误/截断的s。

BTW, is there anything special about the size of the buffer array? Would it faster on average if it were closer to the average file size, maybe 500k in my case? Or is the slowdown I am seeing more likely the overhead of Streamrelative to a whileloop that was more Java-ish, I guess.

顺便说一句,缓冲区数组的大小有什么特别之处吗?如果它更接近平均文件大小(在我的情况下可能是 500k),它平均会更快吗?或者,我认为Stream相对于while更 Java 风格的循环的开销更可能是放缓。