从 S3 并行读取多个文件(Spark、Java)
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/41062705/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Reading multiple files from S3 in parallel (Spark, Java)
提问by Nira
I saw a few discussions on this but couldn't quite understand the right solution: I want to load a couple hundred files from S3 into an RDD. Here is how I'm doing it now:
我看到了一些关于此的讨论,但不太明白正确的解决方案:我想将几百个文件从 S3 加载到 RDD 中。这是我现在的做法:
ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
withBucketName(...).
withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()
JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));
The ReadFromS3Function
does the actual reading using the AmazonS3
client:
在ReadFromS3Function
不使用实际的阅读AmazonS3
客户端:
public Iterator<String> call(String s) throws Exception {
AmazonS3 s3Client = getAmazonS3Client(properties);
S3Object object = s3Client.getObject(new GetObjectRequest(...));
InputStream is = object.getObjectContent();
List<String> lines = new LinkedList<>();
String str;
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
if (is != null) {
while ((str = reader.readLine()) != null) {
lines.add(str);
}
} else {
...
}
} finally {
...
}
return lines.iterator();
I kind of "translated" this from answers I saw for the same question in Scala. I think it's also possible to pass the entire list of paths to sc.textFile(...)
, but I'm not sure which is the best-practice way.
我从我在 Scala 中看到的相同问题的答案中“翻译”了这个。我认为也可以将整个路径列表传递给sc.textFile(...)
,但我不确定哪种是最佳实践方式。
采纳答案by Steve Loughran
the underlying problem is that listing objects in s3 is really slow, and the way it is made to look like a directory tree kills performance whenever something does a treewalk (as wildcard pattern maching of paths does).
潜在的问题是,在 s3 中列出对象真的很慢,而且每当进行树形遍历时,它看起来像目录树的方式都会降低性能(就像路径的通配符模式处理那样)。
The code in the post is doing the all-children listing which delivers way better performance, it's essentially what ships with Hadoop 2.8 and s3a listFiles(path, recursive) see HADOOP-13208.
帖子中的代码正在执行所有子列表,它提供了更好的性能,它本质上是 Hadoop 2.8 和 s3a listFiles(path, recursive) 附带的内容,请参阅HADOOP-13208。
After getting that listing, you've got strings to objects paths which you can then map to s3a/s3n paths for spark to handle as text file inputs, and which you can then apply work to
获得该列表后,您将获得指向对象路径的字符串,然后您可以将这些字符串映射到 s3a/s3n 路径以供 spark 作为文本文件输入处理,然后您可以将工作应用于
val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)
And as requested, here's the java code used.
根据要求,这是使用的 java 代码。
String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey()));
// repeat while objectListing truncated
JavaRDD<String> events = sc.textFile(String.join(",", keys))
Note that I switched s3n to s3a, because, provided you have the hadoop-aws
and amazon-sdk
JARs on your CP, the s3a connector is the one you should be using. It's better, and its the one which gets maintained and tested against spark workloads by people (me). See The history of Hadoop's S3 connectors.
请注意,我将 s3n 切换到 s3a,因为如果您的 CP 上有hadoop-aws
和amazon-sdk
JAR,则您应该使用 s3a 连接器。它更好,而且它是由人们(我)针对火花工作负载进行维护和测试的。请参阅Hadoop S3 连接器的历史。
回答by bob
You may use sc.textFile
to read multiple files.
您可以使用sc.textFile
读取多个文件。
You can pass multiple file url
with as its argument.
你可以通过multiple file url
with 作为它的参数。
You can specify whole directories
, use wildcards
and even CSV of directories and wildcards.
您可以指定目录和通配符的整体directories
、使用wildcards
甚至 CSV。
Ex:
前任:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
回答by sun007
I guess if you try to parallelize while reading aws will be utilizing executor and definitely improve the performance
我想如果你在阅读 aws 时尝试并行化将利用执行器并肯定会提高性能
val bucketName=xxx
val keyname=xxx
val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList)
.flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines }