在 Spark/Scala 中写入 HDFS 读取 zip 文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/42294899/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Writing to HDFS in Spark/Scala reading the zip files
提问by user2699504
I am writing a spark/scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can get this to work for writing to the local file system but wondered if there was a way to to write the output files to a distributed file system such as HDFS. Code is shown below`
我正在编写一个 spark/scala 程序来读取 ZIP 文件,解压缩它们并将内容写入一组新文件。我可以让它用于写入本地文件系统,但想知道是否有办法将输出文件写入分布式文件系统,例如 HDFS。代码如下所示`
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {   
   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          
   val fname = f"/d/tmp/myfile$i.txt" 
   i = i + 1
   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()
   iter                                                       
}).collect()
`
`
回答by dumitru
You can easy write data to HDFS using hadoop-common library (if you are using sbt as dependency manangement tool, add thath library to your dependency). With that you can create a FileSystemobject :
您可以使用 hadoop-common 库轻松地将数据写入 HDFS(如果您使用 sbt 作为依赖项管理工具,请将那个库添加到您的依赖项中)。有了它,您可以创建一个FileSystem对象:
 private val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
  }
Be sure to configure the FileSystem with your hadoop cluster information (core-site.xml, etc)
请务必使用您的 hadoop 集群信息(core-site.xml 等)配置文件系统
Then you can write, for example a String to path (in your case you should deal with streams), on HDFS as following:
然后你可以在 HDFS 上写,例如一个字符串到路径(在你的情况下你应该处理流),如下所示:
@throws[IOException]
  def writeAsString(hdfsPath: String, content: String) {
    val path: Path = new Path(hdfsPath)
    if (fs.exists(path)) {
      fs.delete(path, true)
    }
    val dataOutputStream: FSDataOutputStream = fs.create(path)
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
    bw.write(content)
    bw.close
  }
回答by loneStar
sc.binaryFiles("/user/example/zip_dir", 10)                   //make an RDD from *.zip files in HDFS
    .flatMap((file: (String, PortableDataStream)) => {        //flatmap to unzip each file
        val zipStream = new ZipInputStream(file._2.open)      //open a java.util.zip.ZipInputStream
        val entry = zipStream.getNextEntry                    //get the first entry in the stream
        val iter = Source.fromInputStream(zipStream).getLines //place entry lines into an iterator
        iter.next                                             //pop off the iterator's first line
        iter                                                  //return the iterator
    })
    .saveAsTextFile("/user/example/quoteTable_csv/result.csv")
回答by chateaur
You should have a look at the method saveAsTextFile from the official documentation : http://spark.apache.org/docs/latest/programming-guide.html
您应该查看官方文档中的 saveAsTextFile 方法:http://spark.apache.org/docs/latest/programming-guide.html
It will allow you to save to HDFS :
它将允许您保存到 HDFS:
iter.saveAsTextFile("hdfs://...")
回答by NetanelRabinowitz
You can try saveAsTextFile method.
您可以尝试 saveAsTextFile 方法。
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本。
It will save each partition as a different file, The number of partition you will end up with will be the same as the number of your input files, unless you repartition or coalesce.
它将每个分区保存为不同的文件,除非您重新分区或合并,否则您最终得到的分区数将与输入文件的数量相同。

