Java 使用 Apache Spark 将 RDD 写为文本文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30993655/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 10:28:44  来源:igfitidea点击:

Write RDD as textfile using Apache Spark

javaapache-sparkapache-spark-sql

提问by Shankar

I am exploring Spark for batch processing. I am running the spark on my local machine using standalone mode.

我正在探索用于批处理的 Spark。我使用独立模式在本地机器上运行 spark。

I am trying to convert the Spark RDD as single file [final output] using saveTextFile() method, but its not working.

我正在尝试使用 saveTextFile() 方法将 Spark RDD 转换为单个文件 [最终输出],但它不起作用。

For example if i have more than one partition how we can get one single file as final output.

例如,如果我有多个分区,我们如何获得一个文件作为最终输出。

Update:

更新:

I tried the below approaches, but i am getting null pointer exception.

我尝试了以下方法,但出现空指针异常。

person.coalesce(1).toJavaRDD().saveAsTextFile("C://Java_All//output");
person.repartition(1).toJavaRDD().saveAsTextFile("C://Java_All//output");

The exception is :

例外是:

    15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
15/06/23 18:25:27 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
15/06/23 18:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/06/23 18:25:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

15/06/23 18:25:27 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/06/23 18:25:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Cancelling stage 1
15/06/23 18:25:27 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at TestSpark.java:40) failed in 0.249 s
15/06/23 18:25:28 INFO DAGScheduler: Job 0 failed: saveAsTextFile at TestSpark.java:40, took 0.952286 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1257)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1256)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
15/06/23 18:25:28 INFO SparkContext: Invoking stop() from shutdown hook
15/06/23 18:25:28 INFO SparkUI: Stopped Spark web UI at http://10.37.145.179:4040
15/06/23 18:25:28 INFO DAGScheduler: Stopping DAGScheduler
15/06/23 18:25:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/06/23 18:25:28 INFO Utils: path = C:\Users\crh537\AppData\Local\Temp\spark-a52371d8-ae6a-4567-b759-0a6c66c1908c\blockmgr-4d17a5b4-c8f8-4408-af07-0e88239794e8, already present as root for deletion.
15/06/23 18:25:28 INFO MemoryStore: MemoryStore cleared
15/06/23 18:25:28 INFO BlockManager: BlockManager stopped
15/06/23 18:25:28 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/23 18:25:28 INFO SparkContext: Successfully stopped SparkContext
15/06/23 18:25:28 INFO Utils: Shutdown hook called

Regards, Shankar

问候, 香卡

采纳答案by Maksud

You can use coalescemethod to save into a single file. This way your code will look like this:

您可以使用coalesce方法保存到单个文件中。这样,您的代码将如下所示:

val myFile = sc.textFile("file.txt")
val finalRdd = doStuff(myFile)
finalRdd.coalesce(1).saveAsTextFile("newfile")

There is also another method repartitionto do the same thing, however it will cause a shuffle which is may be very expensive, while coalesce will try to avoid a shuffle.

还有另一种方法repartition可以做同样的事情,但是它会导致可能非常昂贵的洗牌,而合并会尽量避免洗牌。

回答by Zia Kayani

You can use repartition method in RDD. It actually creates as many partitions as you passed integer to it. In your case it will be :

您可以在 RDD 中使用重新分区方法。它实际上创建了与您传递给它的整数一样多的分区。在您的情况下,它将是:

rdd.repartition(1).saveAsTextFile("path to save rdd")

回答by Harvinder Singh

Are you running this on windows? if yes, then you need to add the following line

你在windows上运行这个吗?如果是,那么您需要添加以下行

System.setProperty("hadoop.home.dir", "C:\winutil\")

You can down load the winutils from the following link

您可以从以下链接下载 winutils

http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe

http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe

回答by ramprakash

  1. Download winutils.exe
  2. Place winutils.exe under the bin folder of any drive(D:/Winutils/bin/)
  3. Set the path in your code as below

    System.setProperty("hadoop.home.dir", "D:\\Winutils\\");

  1. 下载 winutils.exe
  2. 将 winutils.exe 放在任意驱动器的 bin 文件夹下(D:/Winutils/bin/)
  3. 在您的代码中设置路径如下

    System.setProperty("hadoop.home.dir", "D:\\Winutils\\");

Now run your code, it has to work.

现在运行你的代码,它必须工作。

回答by Arjun gangineni

Spark internally uses hadoop file system so when you try to read and write on to filesytem it will first look for HADOOP_HOME configuration folder that contains bin\winutils.exe. may be you doesn't set this thats the reason its throwing nullpointer.

Spark 内部使用 hadoop 文件系统,因此当您尝试读取和写入文件系统时,它会首先查找包含 bin\winutils.exe 的 HADOOP_HOME 配置文件夹。可能是你没有设置这个,这就是它抛出空指针的原因。