Java Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30851244/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 10:15:06  来源:igfitidea点击:

Spark read file from S3 using sc.textFile ("s3n://...)

javascalaapache-sparkrddhortonworks-data-platform

提问by Polymerase

Trying to read a file located in S3 using spark-shell:

尝试使用 spark-shell 读取位于 S3 中的文件:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12

scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
    ... etc ...

The IOException: No FileSystem for scheme: s3nerror occurred with:

IOException异常:没有文件系统的方案:S3N与发生错误:

  • Spark 1.31 or 1.40 on dev machine (no Hadoop libs)
  • Running from the Hortonworks Sandbox HDP v2.2.4(Hadoop 2.60) which integrates Spark 1.2.1 out of the box
  • Using s3:// or s3n:// scheme
  • 开发机器上的 Spark 1.31 或 1.40(无 Hadoop 库)
  • Hortonworks Sandbox HDP v2.2.4(Hadoop 2.60) 运行,它集成了开箱即用的 Spark 1.2.1
  • 使用 s3:// 或 s3n:// 方案

What is the cause of this error? Missing dependency, Missing configuration, or mis-use of sc.textFile()?

这个错误的原因是什么?缺少依赖项、缺少配置或误用sc.textFile()?

Or may be this is due to a bug that affects Spark build specific to Hadoop 2.60 as this postseems to suggest. I am going to try Spark for Hadoop 2.40 to see if this solves the issue.

或者这可能是由于影响特定于 Hadoop 2.60 的 Spark 构建的错误造成的,正如这篇文章似乎暗示的那样。我将尝试使用 Spark for Hadoop 2.40,看看这是否能解决问题。

采纳答案by Polymerase

Confirmed that this is related to the Spark build against Hadoop 2.60. Just installed Spark 1.4.0 "Pre built for Hadoop 2.4 and later"(instead of Hadoop 2.6). And the code now works OK.

确认这与针对 Hadoop 2.60 的 Spark 构建有关。刚刚安装了Spark 1.4.0“为 Hadoop 2.4 及更高版本预构建”(而不是 Hadoop 2.6)。代码现在可以正常工作了。

sc.textFile("s3n://bucketname/Filename")now raises another error:

sc.textFile("s3n://bucketname/Filename")现在引发了另一个错误:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

The code below uses the S3 URL format to show that Spark can read S3 file. Using dev machine (no Hadoop libs).

下面的代码使用 S3 URL 格式显示 Spark 可以读取 S3 文件。使用开发机器(无 Hadoop 库)。

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> lyrics.count
res1: Long = 9

Even Better: the code above with AWS credentials inline in the S3N URI will break if the AWS Secret Key has a forward "/". Configuring AWS Credentials in SparkContext will fix it. Code works whether the S3 file is public or private.

更好的是:如果 AWS 密钥具有前向“/”,则上面在 S3N URI 中内嵌 AWS 凭证的代码将中断。在 SparkContext 中配置 AWS 凭证将修复它。无论 S3 文件是公共文件还是私有文件,代码都有效。

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count

回答by Dan Ciborowski - MSFT

S3N is not a default file format. You need to build your version of Spark with a version of Hadoop that has the additional libraries used for AWS compatibility. Additional info I found here, https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

S3N 不是默认文件格式。您需要使用具有用于 AWS 兼容性的附加库的 Hadoop 版本构建您的 Spark 版本。我在这里找到的其他信息,https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

回答by kbt

This is a sample spark code which can read the files present on s3

这是一个示例火花代码,可以读取 s3 上存在的文件

val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)

回答by pkozlov

You probably have to use s3a:/ scheme instead of s3:/ or s3n:/ However, it is not working out of the box (for me) for the spark shell. I see the following stacktrace:

您可能必须使用 s3a:/ 方案而不是 s3:/ 或 s3n:/ 但是,对于火花壳,它不是开箱即用的(对我而言)。我看到以下堆栈跟踪:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
        at $iwC$$iwC$$iwC.<init>(<console>:37)
        at $iwC$$iwC.<init>(<console>:39)
        at $iwC.<init>(<console>:41)
        at <init>(<console>:43)
        at .<init>(<console>:47)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
        ... 68 more

What I think - you have to manually add the hadoop-aws dependency manually http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jarBut I have no idea how to add it to spark-shell properly.

我的想法 - 您必须手动添加 hadoop-aws 依赖项http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar但我不知道如何将其正确添加到 spark-shell。

回答by Uster

For Spark 1.4.x "Pre built for Hadoop 2.6 and later":

对于 Spark 1.4.x“为 Hadoop 2.6 及更高版本预构建”:

I just copied needed S3, S3native packages from hadoop-aws-2.6.0.jar to spark-assembly-1.4.1-hadoop2.6.0.jar.

我只是将需要的 S3、S3native 包从 hadoop-aws-2.6.0.jar 复制到 spark-assembly-1.4.1-hadoop2.6.0.jar。

After that I restarted spark cluster and it works. Do not forget to check owner and mode of the assembly jar.

之后我重新启动了spark集群并且它可以工作。不要忘记检查装配罐的所有者和模式。

回答by Andrew K

You can add the --packages parameter with the appropriate jar: to your submission:

您可以将 --packages 参数与适当的 jar: 添加到您的提交中:

bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py

回答by Sergey Bahchissaraitsev

Despite that this question has already an accepted answer, I think that the exact details of why this is happening are still missing. So I think there might be a place for one more answer.

尽管这个问题已经得到了公认的答案,但我认为仍然缺少为什么会发生这种情况的确切细节。所以我认为可能还有一个地方可以提供更多答案。

If you add the required hadoop-awsdependency, your code should work.

如果您添加所需的hadoop-aws依赖项,您的代码应该可以工作。

Starting Hadoop 2.6.0, s3 FS connector has been moved to a separate library called hadoop-aws. There is also a Jira for that: Move s3-related FS connector code to hadoop-aws.

从 Hadoop 2.6.0 开始,s3 FS 连接器已移至名为 hadoop-aws 的单独库。还有一个 Jira: 将与 s3 相关的 FS 连接器代码移动到 hadoop-aws

This means that any version of spark, that has been built against Hadoop 2.6.0 or newer will have to use another external dependency to be able to connect to the S3 File System.
Here is an sbt example that I have tried and is working as expected using Apache Spark 1.6.2 built against Hadoop 2.6.0:

这意味着针对 Hadoop 2.6.0 或更新版本构建的任何版本的 spark 都必须使用另一个外部依赖项才能连接到 S3 文件系统。
这是我尝试过的一个 sbt 示例,并且使用针对 Hadoop 2.6.0 构建的 Apache Spark 1.6.2 按预期工作:

libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"

libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"

In my case, I encountered some dependencies issues, so I resolved by adding exclusion:

就我而言,我遇到了一些依赖问题,所以我通过添加排除来解决:

libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")

libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")

On other related note, I have yet to try it, but that it is recommended to use "s3a" and not "s3n" filesystem starting Hadoop 2.6.0.

在其他相关说明中,我还没有尝试过,但建议从 Hadoop 2.6.0 开始使用“s3a”而不是“s3n”文件系统。

The third generation, s3a: filesystem. Designed to be a switch in replacement for s3n:, this filesystem binding supports larger files and promises higher performance.

第三代,s3a:文件系统。设计为替代 s3n: 的开关,此文件系统绑定支持更大的文件并承诺更高的性能。

回答by Steve Loughran

There is a Spark JIRA, SPARK-7481, open as of today, oct 20, 2016, to add a spark-cloud module which includes transitive dependencies on everything s3a and azure wasb: need, along with tests.

有一个 Spark JIRA,SPARK-7481,截至今天,即 2016 年 10 月 20 日开放,以添加一个 spark-cloud 模块,该模块包括对所有 s3a 和 azure wasb 的传递依赖关系:需要,以及测试。

And a Spark PRto match. This is how I get s3a support into my spark builds

Spark PR匹配。这就是我在 Spark 构建中获得 s3a 支持的方式

If you do it by hand, you must get hadoop-aws JAR of the exact version the rest of your hadoop JARS have, and a version of the AWS JARs 100% in sync with what Hadoop aws was compiled against. For Hadoop 2.7.{1, 2, 3, ...}

如果您手动完成,您必须获得与您的其余 hadoop JARS 具有的确切版本相同的 hadoop-aws JAR,以及与编译 Hadoop aws 所针对的 100% 同步的 AWS JAR 版本。对于 Hadoop 2.7.{1, 2, 3, ...}

hadoop-aws-2.7.x.jar 
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ Hymanson-*-2.6.5.jar

Stick all of these into SPARK_HOME/jars. Run spark with your credentials set up in Env vars or in spark-default.conf

将所有这些都粘贴到SPARK_HOME/jars. 使用在环境变量中设置的凭据运行 spark 或spark-default.conf

the simplest test is can you do a line count of a CSV File

最简单的测试是您可以对 CSV 文件进行行计数吗

val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()

Get a number: all is well. Get a stack trace. Bad news.

得到一个数字:一切都很好。获取堆栈跟踪。坏消息。

回答by user592894

Ran into the same problem in Spark 2.0.2. Resolved it by feeding it the jars. Here's what I ran:

在 Spark 2.0.2 中遇到了同样的问题。通过喂它罐子解决了它。这是我跑的:

$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,Hymanson-annotations-2.7.0.jar,Hymanson-core-2.7.0.jar,Hymanson-databind-2.7.0.jar,joda-time-2.9.6.jar

scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")

obviously, you need to have the jars in the path where you're running spark-shell from

显然,您需要在运行 spark-shell 的路径中放置罐子

回答by Gaj

USe s3a instead of s3n. I had similar issue on a Hadoop job. After switching from s3n to s3a it worked.

使用 s3a 而不是 s3n。我在 Hadoop 工作中遇到了类似的问题。从 s3n 切换到 s3a 后,它起作用了。

e.g.

例如

s3a://myBucket/myFile1.log

s3a://myBucket/myFile1.log