scala 如何在 Apache Spark 中读取包含多个文件的 zip

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32080475/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:29:03  来源:igfitidea点击:

How to read a zip containing multiple files in Apache Spark

scalaapache-sparkpyspark

提问by Abhishek Choudhary

I am having a Zipped file containing multiple text files. I want to read each of the file and build a List of RDD containining the content of each files.

我有一个包含多个文本文件的压缩文件。我想读取每个文件并构建一个包含每个文件内容的 RDD 列表。

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

will just entire files, but how to iterate through each content of zip and then save the same in RDD using Spark.

将只是整个文件,但如何遍历 zip 的每个内容,然后使用 Spark 将其保存在 RDD 中。

I am fine with Scala or Python.

我对 Scala 或 Python 没问题。

Possible solution in Python with using Spark -

使用 Spark 在 Python 中的可能解决方案 -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
    urls = file_path.split("/")
    urlId = urls[-1].split('_')[0]

回答by Atais

Apache Spark default compression support

Apache Spark 默认压缩支持

I have written all the necessary theory in other answer, that you might want to refer to: https://stackoverflow.com/a/45958182/1549135

我已经在其他答案中写了所有必要的理论,您可能想参考:https: //stackoverflow.com/a/45958182/1549135

Read zip containing multiple files

读取包含多个文件的 zip

I have followed the advice given by @Hermanand used ZipInputStream. This gave me this solution, which returns RDD[String]of the zip content.

我遵循了@Herman给出的建议并使用了ZipInputStream. 这给了我这个解决方案,它返回RDD[String]zip 内容。

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry)
                  .takeWhile {
                      case null => zis.close(); false
                      case _ => true
                  }
                  .flatMap { _ =>
                      val br = new BufferedReader(new InputStreamReader(zis))
                      Stream.continually(br.readLine()).takeWhile(_ != null)
                  }
        }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

simply use it by importing the implicit class and call the readFile method on SparkContext:

只需通过导入隐式类并调用 SparkContext 上的 readFile 方法来使用它:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)

回答by Herman

If you are reading binary files use sc.binaryFiles. This will return an RDD of tuples containing the file name and a PortableDataStream. You can feed the latter into a ZipInputStream.

如果您正在阅读二进制文件,请使用sc.binaryFiles. 这将返回一个包含文件名和PortableDataStream. 您可以将后者输入到ZipInputStream.

回答by mahmoud mehdi

Here's a working version of @Atais solution (which needs enhancement by closing the streams) :

这是@Atais 解决方案的工作版本(需要通过关闭流来增强):

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

def readFile(path: String,
             minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

  if (path.toLowerCase.contains("zip")) {

    sc.binaryFiles(path, minPartitions)
      .flatMap {
        case (zipFilePath, zipContent) ?
          val zipInputStream = new ZipInputStream(zipContent.open())
          Stream.continually(zipInputStream.getNextEntry)
            .takeWhile(_ != null)
            .map { _ ?
              scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString("\n")
            } #::: { zipInputStream.close; Stream.empty[String] }
      }
  } else {
    sc.textFile(path, minPartitions)
  }
}
}

Then all you have to do is the following to read a zip file :

然后你所要做的就是阅读一个 zip 文件:

sc.readFile(path)

回答by Anand

This filters only the first line. can anyone share your insights. I am trying to read a CSV file which is zipped and create JavaRDD for further processing.

这仅过滤第一行。任何人都可以分享您的见解。我正在尝试读取压缩的 CSV 文件并创建 JavaRDD 以进行进一步处理。

JavaPairRDD<String, PortableDataStream> zipData =
                sc.binaryFiles("hdfs://temp.zip");
        JavaRDD<Record> newRDDRecord = zipData.flatMap(
          new FlatMapFunction<Tuple2<String, PortableDataStream>, Record>(){
              public Iterator<Record> call(Tuple2<String,PortableDataStream> content) throws Exception {
                  List<Record> records = new ArrayList<Record>();
                      ZipInputStream zin = new ZipInputStream(content._2.open());
                      ZipEntry zipEntry;
                      while ((zipEntry = zin.getNextEntry()) != null) {
                          count++;
                          if (!zipEntry.isDirectory()) {
                              Record sd;
                              String line;
                              InputStreamReader streamReader = new InputStreamReader(zin);
                              BufferedReader bufferedReader = new BufferedReader(streamReader);
                              line = bufferedReader.readLine();
                              String[] records= new CSVParser().parseLineMulti(line);
                              sd = new Record(TimeBuilder.convertStringToTimestamp(records[0]),
                                        getDefaultValue(records[1]),
                                        getDefaultValue(records[22]));
                              records.add(sd);
                          }
                      }

                return records.iterator();
              }

        });

回答by sri hari kali charan Tummala

Here is another working solution which gives out file name which can be later split and used to create separate schemas from it.

这是另一个工作解决方案,它给出了文件名,以后可以将其拆分并用于从中创建单独的模式。

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.toLowerCase.contains("zip")) {

        sc.binaryFiles(path, minPartitions)
          .flatMap {
            case (zipFilePath, zipContent) ?
              val zipInputStream = new ZipInputStream(zipContent.open())
              Stream.continually(zipInputStream.getNextEntry)
                .takeWhile(_ != null)
                .map { x ?
                  val filename1 = x.getName
                  scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString(s"~${filename1}\n")+s"~${filename1}"
                } #::: { zipInputStream.close; Stream.empty[String] }
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

full code is here

完整代码在这里

https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/extraDFExamples/SparkReadZipFiles.scala

https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/extraDFExamples/SparkReadZipFiles.scala