Python Spark Context Textfile:加载多个文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23397907/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 02:55:25  来源:igfitidea点击:

Spark Context Textfile: load multiple files

pythonapache-spark

提问by Raj

I need to process multiple files scattered across various directories. I would like to load all these up in a single RDD and then perform map/reduce on it. I see that SparkContext is able to load multiple files from a single directory using wildcards. I am not sure how to load up files from multiple folders.

我需要处理分散在各个目录中的多个文件。我想将所有这些加载到一个 RDD 中,然后对其执行 map/reduce。我看到 SparkContext 能够使用通配符从单个目录加载多个文件。我不确定如何从多个文件夹加载文件。

The following code snippet fails:

以下代码片段失败:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

This fails on the third loop with the following error message:

这在第三个循环中失败,并显示以下错误消息:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

Which is bizarre given I am providing only 2 arguments. Any pointers appreciated.

鉴于我只提供 2 个参数,这很奇怪。任何指针表示赞赏。

采纳答案by Daniel Darabos

How about this phrasing instead?

换个说法怎么样?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

In Scala SparkContext.union()has two variants, one that takes vararg arguments, and one that takes a list. Only the second one exists in Python (since Python does not have polymorphism).

在 Scala 中SparkContext.union()有两种变体,一种采用 vararg 参数,另一种采用列表。Python 中只存在第二个(因为 Python 没有多态性)。

UPDATE

更新

You can use a single textFilecall to read multiple files.

您可以使用单个textFile调用读取多个文件。

sc.textFile(','.join(files))

回答by Murtaza Kanchwala

You can use this

你可以用这个

First You can get a Buffer/List of S3 Paths :

首先,您可以获得 S3 路径的缓冲区/列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

现在将此 List 对象传递给以下代码,注意:sc 是 SQLContext 的对象

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

现在你得到了一个最终的统一 RDD,即 df

Optional, And You can also repartition it in a single BigRDD

可选,您也可以在单个 BigRDD 中重新分区

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D

重新分区总是有效的:D

回答by fibonacci

I solve similar problems by using wildcard.

我通过使用通配符解决了类似的问题。

e.g. I found some traits in the files I want to load in spark,

例如,我在要加载到 spark 中的文件中发现了一些特征,

dir

subdir1/folder1/x.txt

subdir2/folder2/y.txt

目录

subdir1/folder1/x.txt

subdir2/folder2/y.txt

you can use the following sentence

你可以用下面的句子

sc.textFile("dir/*/*/*.txt")

to load all relative files.

加载所有相关文件。

The wildcard '*' only works in single level directory, which is not recursive.

通配符“*”只在单级目录下有效,不能递归。

回答by Neil

You can use the following function of SparkContext:

您可以使用 SparkContext 的以下功能:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 中读取文本文件目录。每个文件被读取为一条记录并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext