scala 使用 sc.textFile() 加载本地文件以触发

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32064280/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:28:04  来源:igfitidea点击:

load a local file to spark using sc.textFile()

scalaapihadoopapache-sparkrdd

提问by spark_dream

Question

问题

How to load a file from the local file system to Spark using sc.textFile? Do I need to change any -env variables? Also when I tried the same on my windows where Hadoop is not installed I got the same error.

如何使用 sc.textFile 从本地文件系统加载文件到 Spark?我需要更改任何 -env 变量吗?此外,当我在未安装 Hadoop 的 Windows 上尝试相同的操作时,我遇到了同样的错误。

Code

代码

> val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(63280) called with curMem=0, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.8 KB, free 265.1 MB)
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(19750) called with curMem=63280, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.3 KB, free 265.1 MB)
/17 22:28:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:53659 (size: 19.3 KB, free: 265.1 MB)
/17 22:28:18 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
File: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

> val words = input.flatMap(line => line.split(" "))
ole>:19: error: not found: value input
  val words = input.flatMap(line => line.split(" "))
              ^

> val words = inputFile.flatMap(line => line.split(" "))
: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23

> val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}

Error

错误

apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/c:/spark-1.4.1-bin-hadoop2.6/bin/file/C:/Users/swaapnika/Desktop/to do list
   at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
   at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
   at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
   at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:289)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
   at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
   at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
   at $iwC$$iwC$$iwC.<init>(<console>:38)
   at $iwC$$iwC.<init>(<console>:40)
   at $iwC.<init>(<console>:42)
   at <init>(<console>:44)
   at .<init>(<console>:48)
   at .<clinit>(<console>)
   at .<init>(<console>:7)
   at .<clinit>(<console>)
   at $print(<console>)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
   at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
   at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
   at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
   at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
   at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
   at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
   at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
   at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
   at org.apache.spark.repl.Main$.main(Main.scala:31)
   at org.apache.spark.repl.Main.main(Main.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at 

org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


>

回答by spark_dream

I checked all the dependencies and the environment variables again. The actual path "file:///home/..../.. .txt" would fetch the data from the local file system as the hadoop env.sh file has its default file system set to fs.defaultFs. If we leave the Spark-env.sh to its defaults without any change it takes the local file system when it encounters "file://..." and the hdfs when the path is "hdfs://.." If you specifically need any file system export HADOOP_CONF_DIR to the spark-env.sh And it would support any file system supported by Hadoop. This was my observation. Any corrections or suggestions accepted. Thank you

我再次检查了所有依赖项和环境变量。实际路径“file:///home/..../.. .txt”将从本地文件系统获取数据,因为 hadoop env.sh 文件的默认文件系统设置为 fs.defaultFs。如果我们将 Spark-env.sh 保留为其默认值而不进行任何更改,则当它遇到“file://...”时会使用本地文件系统,而当路径为“hdfs://...”时会使用 hdfs 如果您特别需要任何文件系统将 HADOOP_CONF_DIR 导出到 spark-env.sh 并且它将支持 Hadoop 支持的任何文件系统。这是我的观察。接受任何更正或建议。谢谢

回答by Ton Torres

Try changing

尝试改变

val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")

to this:

对此:

val inputFile = sc.textFile("file:///Users/swaapnika/Desktop/to do list")

I'm also fairly new to hadoop and spark, but from what I gather, when running spark locally on Windows, the string file:///when passed to sc.textFilealready refers to C:\.

我对 hadoop 和 spark 也很陌生,但据我所知,在 Windows 上本地运行 sparkfile:///时,传递给的字符串sc.textFile已经指代C:\.

回答by Jem Tucker

The file path you have defined is incorrect.

您定义的文件路径不正确。

Try changing

尝试改变

sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")

to

sc.textFile("file://C:/Users/swaapnika/Desktop/to do list")

or

或者

sc.textFile("C:/Users/swaapnika/Desktop/to do list") 

回答by Sathiyan S

This error happens when you run spark in a cluster. When you submit a job to spark cluster the cluster manager(YARN or Mesos or any) will submit it to worker node. When the worker node trying to find the path of the file we need to load into spark it fails because the worker doesn't have such file. So try running spark-shell in local mode and try again,

在集群中运行 spark 时会发生此错误。当您提交作业以触发集群时,集群管理器(YARN 或 Mesos 或任何)会将其提交给工作节点。当工作节点试图找到我们需要加载到 spark 中的文件路径时,它会失败,因为工作节点没有这样的文件。所以尝试在本地模式下运行 spark-shell 并重试,

\bin\spark-shell --master local

\bin\spark-shell --master local

sc.textFile("file:///C:/Users/swaapnika/Desktop/to do list")

sc.textFile("file:///C:/Users/swaapnika/Desktop/to do list")

let me know if this helps.

如果这有帮助,请告诉我。