scala 我可以从 Spark 程序而不是从 RDD 编写纯文本 HDFS(或本地)文件吗?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32952121/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:41:34  来源:igfitidea点击:

Can I write a plain text HDFS (or local) file from a Spark program, not from an RDD?

scalahadoopapache-spark

提问by Joe

I have a Spark program (in Scala) and a SparkContext. I am writing some files with RDD's saveAsTextFile. On my local machine I can use a local file path and it works with the local file system. On my cluster it works with HDFS.

我有一个 Spark 程序(在 Scala 中)和一个SparkContext. 我正在用RDD's编写一些文件saveAsTextFile。在我的本地机器上,我可以使用本地文件路径,并且它适用于本地文件系统。在我的集群上,它适用于 HDFS。

I also want to write other arbitrary files as the result of processing. I'm writing them as regular files on my local machine, but want them to go into HDFS on the cluster.

我也想写其他任意文件作为处理的结果。我在本地机器上将它们作为常规文件写入,但希望它们进入集群上的 HDFS。

SparkContextseems to have a few file-related methods but they all seem to be inputs not outputs.

SparkContext似乎有一些与文件相关的方法,但它们似乎都是输入而不是输出。

How do I do this?

我该怎么做呢?

回答by Joe

Thanks to marios and kostya, but there are few steps to writing a text file into HDFS from Spark.

感谢 marios 和 kostya,但是从 Spark 将文本文件写入 HDFS 的步骤很少。

// Hadoop Config is accessible from SparkContext
val fs = FileSystem.get(sparkContext.hadoopConfiguration); 

// Output file can be created from file system.
val output = fs.create(new Path(filename));

// But BufferedOutputStream must be used to output an actual text file.
val os = BufferedOutputStream(output)

os.write("Hello World".getBytes("UTF-8"))

os.close()

Note that FSDataOutputStream, which has been suggested, is a Java serialized object output stream, not a text output stream. The writeUTFmethod appears to write plaint text, but it's actually a binary serialization format that includes extra bytes.

请注意,FSDataOutputStream已建议的 是 Java 序列化对象输出流,而不是文本输出流。该writeUTF方法似乎编写纯文本,但它实际上是一种包含额外字节的二进制序列化格式。

回答by Martin Tapp

Here's what worked best for me (using Spark 2.0):

以下是最适合我的方法(使用 Spark 2.0):

val path = new Path("hdfs://namenode:8020/some/folder/myfile.txt")
val conf = new Configuration(spark.sparkContext.hadoopConfiguration)
conf.setInt("dfs.blocksize", 16 * 1024 * 1024) // 16MB HDFS Block Size
val fs = path.getFileSystem(conf)
if (fs.exists(path))
    fs.delete(path, true)
val out = new BufferedOutputStream(fs.create(path)))
val txt = "Some text to output"
out.write(txt.getBytes("UTF-8"))
out.flush()
out.close()
fs.close()

回答by kostya

Using HDFS API (hadoop-hdfs.jar) you can create InputStream/OutputStream for an HDFS path and read from/write to a file using regular java.io classes. For example:

使用 HDFS API (hadoop-hdfs.jar),您可以为 HDFS 路径创建 InputStream/OutputStream,并使用常规 java.io 类读取/写入文件。例如:

URI uri = URI.create (“hdfs://host:port/file path”);
Configuration conf = new Configuration();
FileSystem file = FileSystem.get(uri, conf);
FSDataInputStream in = file.open(new Path(uri));

This code will work with local files as well (change hdfs://to file://).

此代码也适用于本地文件(更改hdfs://file://)。

回答by marios

One simple way to write files to HDFS is to use a SequenceFiles. Here you use the native Hadoop APIs and notthe ones provided by Spark.

将文件写入 HDFS 的一种简单方法是使用SequenceFiles。在这里,您使用本机 Hadoop API,而不是Spark 提供的API 。

Here is a simple snippet (in Scala):

这是一个简单的片段(在 Scala 中):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io._ 

val conf = new Configuration() // Hadoop configuration 
val sfwriter = SequenceFile.createWriter(conf,
              SequenceFile.Writer.file(new Path("hdfs://nn1.example.com/file1")),
              SequenceFile.Writer.keyClass(LongWritable.class),
              SequenceFile.Writer.valueClass(Text.class))
val lw = new LongWritable()
val txt = new Text()
lw.set(12)
text.set("hello")
sfwriter.append(lw, txt)
sfwriter.close()
...

In case you don't have a key you can use NullWritable.classin its place:

如果您没有钥匙,您可以NullWritable.class在它的位置使用:

SequenceFile.Writer.keyClass(NullWritable.class)
sfwriter.append(NullWritable.get(), new Text("12345"));