Java Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30851244/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark read file from S3 using sc.textFile ("s3n://...)
提问by Polymerase
Trying to read a file located in S3 using spark-shell:
尝试使用 spark-shell 读取位于 S3 中的文件:
scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12
scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
... etc ...
The IOException: No FileSystem for scheme: s3nerror occurred with:
该IOException异常:没有文件系统的方案:S3N与发生错误:
- Spark 1.31 or 1.40 on dev machine (no Hadoop libs)
- Running from the Hortonworks Sandbox HDP v2.2.4(Hadoop 2.60) which integrates Spark 1.2.1 out of the box
- Using s3:// or s3n:// scheme
- 开发机器上的 Spark 1.31 或 1.40(无 Hadoop 库)
- 从Hortonworks Sandbox HDP v2.2.4(Hadoop 2.60) 运行,它集成了开箱即用的 Spark 1.2.1
- 使用 s3:// 或 s3n:// 方案
What is the cause of this error? Missing dependency, Missing configuration, or mis-use of sc.textFile()
?
这个错误的原因是什么?缺少依赖项、缺少配置或误用sc.textFile()
?
Or may be this is due to a bug that affects Spark build specific to Hadoop 2.60 as this postseems to suggest. I am going to try Spark for Hadoop 2.40 to see if this solves the issue.
或者这可能是由于影响特定于 Hadoop 2.60 的 Spark 构建的错误造成的,正如这篇文章似乎暗示的那样。我将尝试使用 Spark for Hadoop 2.40,看看这是否能解决问题。
采纳答案by Polymerase
Confirmed that this is related to the Spark build against Hadoop 2.60. Just installed Spark 1.4.0 "Pre built for Hadoop 2.4 and later"(instead of Hadoop 2.6). And the code now works OK.
确认这与针对 Hadoop 2.60 的 Spark 构建有关。刚刚安装了Spark 1.4.0“为 Hadoop 2.4 及更高版本预构建”(而不是 Hadoop 2.6)。代码现在可以正常工作了。
sc.textFile("s3n://bucketname/Filename")
now raises another error:
sc.textFile("s3n://bucketname/Filename")
现在引发了另一个错误:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
The code below uses the S3 URL format to show that Spark can read S3 file. Using dev machine (no Hadoop libs).
下面的代码使用 S3 URL 格式显示 Spark 可以读取 S3 文件。使用开发机器(无 Hadoop 库)。
scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
scala> lyrics.count
res1: Long = 9
Even Better: the code above with AWS credentials inline in the S3N URI will break if the AWS Secret Key has a forward "/". Configuring AWS Credentials in SparkContext will fix it. Code works whether the S3 file is public or private.
更好的是:如果 AWS 密钥具有前向“/”,则上面在 S3N URI 中内嵌 AWS 凭证的代码将中断。在 SparkContext 中配置 AWS 凭证将修复它。无论 S3 文件是公共文件还是私有文件,代码都有效。
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count
回答by Dan Ciborowski - MSFT
S3N is not a default file format. You need to build your version of Spark with a version of Hadoop that has the additional libraries used for AWS compatibility. Additional info I found here, https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce
S3N 不是默认文件格式。您需要使用具有用于 AWS 兼容性的附加库的 Hadoop 版本构建您的 Spark 版本。我在这里找到的其他信息,https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce
回答by kbt
This is a sample spark code which can read the files present on s3
这是一个示例火花代码,可以读取 s3 上存在的文件
val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)
回答by pkozlov
You probably have to use s3a:/ scheme instead of s3:/ or s3n:/ However, it is not working out of the box (for me) for the spark shell. I see the following stacktrace:
您可能必须使用 s3a:/ 方案而不是 s3:/ 或 s3n:/ 但是,对于火花壳,它不是开箱即用的(对我而言)。我看到以下堆栈跟踪:
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC.<init>(<console>:39)
at $iwC.<init>(<console>:41)
at <init>(<console>:43)
at .<init>(<console>:47)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
... 68 more
What I think - you have to manually add the hadoop-aws dependency manually http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jarBut I have no idea how to add it to spark-shell properly.
我的想法 - 您必须手动添加 hadoop-aws 依赖项http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar但我不知道如何将其正确添加到 spark-shell。
回答by Uster
For Spark 1.4.x "Pre built for Hadoop 2.6 and later":
对于 Spark 1.4.x“为 Hadoop 2.6 及更高版本预构建”:
I just copied needed S3, S3native packages from hadoop-aws-2.6.0.jar to spark-assembly-1.4.1-hadoop2.6.0.jar.
我只是将需要的 S3、S3native 包从 hadoop-aws-2.6.0.jar 复制到 spark-assembly-1.4.1-hadoop2.6.0.jar。
After that I restarted spark cluster and it works. Do not forget to check owner and mode of the assembly jar.
之后我重新启动了spark集群并且它可以工作。不要忘记检查装配罐的所有者和模式。
回答by Andrew K
You can add the --packages parameter with the appropriate jar: to your submission:
您可以将 --packages 参数与适当的 jar: 添加到您的提交中:
bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py
回答by Sergey Bahchissaraitsev
Despite that this question has already an accepted answer, I think that the exact details of why this is happening are still missing. So I think there might be a place for one more answer.
尽管这个问题已经得到了公认的答案,但我认为仍然缺少为什么会发生这种情况的确切细节。所以我认为可能还有一个地方可以提供更多答案。
If you add the required hadoop-awsdependency, your code should work.
如果您添加所需的hadoop-aws依赖项,您的代码应该可以工作。
Starting Hadoop 2.6.0, s3 FS connector has been moved to a separate library called hadoop-aws. There is also a Jira for that: Move s3-related FS connector code to hadoop-aws.
从 Hadoop 2.6.0 开始,s3 FS 连接器已移至名为 hadoop-aws 的单独库。还有一个 Jira: 将与 s3 相关的 FS 连接器代码移动到 hadoop-aws。
This means that any version of spark, that has been built against Hadoop 2.6.0 or newer will have to use another external dependency to be able to connect to the S3 File System.
Here is an sbt example that I have tried and is working as expected using Apache Spark 1.6.2 built against Hadoop 2.6.0:
这意味着针对 Hadoop 2.6.0 或更新版本构建的任何版本的 spark 都必须使用另一个外部依赖项才能连接到 S3 文件系统。
这是我尝试过的一个 sbt 示例,并且使用针对 Hadoop 2.6.0 构建的 Apache Spark 1.6.2 按预期工作:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"
In my case, I encountered some dependencies issues, so I resolved by adding exclusion:
就我而言,我遇到了一些依赖问题,所以我通过添加排除来解决:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")
On other related note, I have yet to try it, but that it is recommended to use "s3a" and not "s3n" filesystem starting Hadoop 2.6.0.
在其他相关说明中,我还没有尝试过,但建议从 Hadoop 2.6.0 开始使用“s3a”而不是“s3n”文件系统。
The third generation, s3a: filesystem. Designed to be a switch in replacement for s3n:, this filesystem binding supports larger files and promises higher performance.
第三代,s3a:文件系统。设计为替代 s3n: 的开关,此文件系统绑定支持更大的文件并承诺更高的性能。
回答by Steve Loughran
There is a Spark JIRA, SPARK-7481, open as of today, oct 20, 2016, to add a spark-cloud module which includes transitive dependencies on everything s3a and azure wasb: need, along with tests.
有一个 Spark JIRA,SPARK-7481,截至今天,即 2016 年 10 月 20 日开放,以添加一个 spark-cloud 模块,该模块包括对所有 s3a 和 azure wasb 的传递依赖关系:需要,以及测试。
And a Spark PRto match. This is how I get s3a support into my spark builds
和Spark PR匹配。这就是我在 Spark 构建中获得 s3a 支持的方式
If you do it by hand, you must get hadoop-aws JAR of the exact version the rest of your hadoop JARS have, and a version of the AWS JARs 100% in sync with what Hadoop aws was compiled against. For Hadoop 2.7.{1, 2, 3, ...}
如果您手动完成,您必须获得与您的其余 hadoop JARS 具有的确切版本相同的 hadoop-aws JAR,以及与编译 Hadoop aws 所针对的 100% 同步的 AWS JAR 版本。对于 Hadoop 2.7.{1, 2, 3, ...}
hadoop-aws-2.7.x.jar
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ Hymanson-*-2.6.5.jar
Stick all of these into SPARK_HOME/jars
. Run spark with your credentials set up in Env vars or in spark-default.conf
将所有这些都粘贴到SPARK_HOME/jars
. 使用在环境变量中设置的凭据运行 spark 或spark-default.conf
the simplest test is can you do a line count of a CSV File
最简单的测试是您可以对 CSV 文件进行行计数吗
val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()
Get a number: all is well. Get a stack trace. Bad news.
得到一个数字:一切都很好。获取堆栈跟踪。坏消息。
回答by user592894
Ran into the same problem in Spark 2.0.2. Resolved it by feeding it the jars. Here's what I ran:
在 Spark 2.0.2 中遇到了同样的问题。通过喂它罐子解决了它。这是我跑的:
$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,Hymanson-annotations-2.7.0.jar,Hymanson-core-2.7.0.jar,Hymanson-databind-2.7.0.jar,joda-time-2.9.6.jar
scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")
obviously, you need to have the jars in the path where you're running spark-shell from
显然,您需要在运行 spark-shell 的路径中放置罐子
回答by Gaj
USe s3a instead of s3n. I had similar issue on a Hadoop job. After switching from s3n to s3a it worked.
使用 s3a 而不是 s3n。我在 Hadoop 工作中遇到了类似的问题。从 s3n 切换到 s3a 后,它起作用了。
e.g.
例如
s3a://myBucket/myFile1.log
s3a://myBucket/myFile1.log