Python Spark Context Textfile:加载多个文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/23397907/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Context Textfile: load multiple files
提问by Raj
I need to process multiple files scattered across various directories. I would like to load all these up in a single RDD and then perform map/reduce on it. I see that SparkContext is able to load multiple files from a single directory using wildcards. I am not sure how to load up files from multiple folders.
我需要处理分散在各个目录中的多个文件。我想将所有这些加载到一个 RDD 中,然后对其执行 map/reduce。我看到 SparkContext 能够使用通配符从单个目录加载多个文件。我不确定如何从多个文件夹加载文件。
The following code snippet fails:
以下代码片段失败:
for fileEntry in files:
fileName = basePath + "/" + fileEntry
lines = sc.textFile(fileName)
if retval == None:
retval = lines
else:
retval = sc.union(retval, lines)
This fails on the third loop with the following error message:
这在第三个循环中失败,并显示以下错误消息:
retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)
Which is bizarre given I am providing only 2 arguments. Any pointers appreciated.
鉴于我只提供 2 个参数,这很奇怪。任何指针表示赞赏。
采纳答案by Daniel Darabos
How about this phrasing instead?
换个说法怎么样?
sc.union([sc.textFile(basepath + "/" + f) for f in files])
In Scala SparkContext.union()
has two variants, one that takes vararg arguments, and one that takes a list. Only the second one exists in Python (since Python does not have polymorphism).
在 Scala 中SparkContext.union()
有两种变体,一种采用 vararg 参数,另一种采用列表。Python 中只存在第二个(因为 Python 没有多态性)。
UPDATE
更新
You can use a single textFile
call to read multiple files.
您可以使用单个textFile
调用读取多个文件。
sc.textFile(','.join(files))
回答by Murtaza Kanchwala
You can use this
你可以用这个
First You can get a Buffer/List of S3 Paths :
首先,您可以获得 S3 路径的缓冲区/列表:
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
Now Pass this List object to the following piece of code, note : sc is an object of SQLContext
现在将此 List 对象传递给以下代码,注意:sc 是 SQLContext 的对象
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
Now you got a final Unified RDD i.e. df
现在你得到了一个最终的统一 RDD,即 df
Optional, And You can also repartition it in a single BigRDD
可选,您也可以在单个 BigRDD 中重新分区
val files = sc.textFile(filename, 1).repartition(1)
Repartitioning always works :D
重新分区总是有效的:D
回答by fibonacci
I solve similar problems by using wildcard.
我通过使用通配符解决了类似的问题。
e.g. I found some traits in the files I want to load in spark,
例如,我在要加载到 spark 中的文件中发现了一些特征,
dir
subdir1/folder1/x.txt
subdir2/folder2/y.txt
目录
subdir1/folder1/x.txt
subdir2/folder2/y.txt
you can use the following sentence
你可以用下面的句子
sc.textFile("dir/*/*/*.txt")
to load all relative files.
加载所有相关文件。
The wildcard '*' only works in single level directory, which is not recursive.
通配符“*”只在单级目录下有效,不能递归。
回答by Neil
You can use the following function of SparkContext:
您可以使用 SparkContext 的以下功能:
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 中读取文本文件目录。每个文件被读取为一条记录并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext