在 Scala/Spark 中的 HDFS 上将文件从一个文件夹移动到另一个文件夹
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/48226882/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Move file from one folder to another on HDFS in Scala / Spark
提问by osk
I have two paths, one for a file and one for a folder. I would like to move the file into that folder on HDFS. How can I do that in Scala? I'm using Spark, too
我有两个路径,一个用于文件,一个用于文件夹。我想将文件移动到 HDFS 上的该文件夹中。我怎样才能在 Scala 中做到这一点?我也在用 Spark
Bonus if the same code will work for Windows paths too, just like reading/writing files on HDFS, but not required.
如果相同的代码也适用于 Windows 路径,就像在 HDFS 上读/写文件一样,但不是必需的。
I have tried the following:
我尝试了以下方法:
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.moveFromLocalFile(something, something2)
And I get the following error:
我收到以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs:/user/o/datasets/data.txt, expected: file:///
线程“main”中的异常 java.lang.IllegalArgumentException: 错误的 FS: hdfs:/user/o/datasets/data.txt, 预期: file:///
Same goes for moveToLocalFile()because they are meant to transfer files between filesystems, not within a filesystem. I have also tried fs.rename()but that did not do anything at all (no error or anything either).
也是如此,moveToLocalFile()因为它们旨在在文件系统之间传输文件,而不是在文件系统内。我也尝试过,fs.rename()但这根本没有做任何事情(没有错误或任何事情)。
I basically create files in one directory (writing to them with a stream) and once they are done they need to moved into a different directory. This different directory is monitored by Spark streaming and I have had some issues when Spark streaming tries to work with not finished files
我基本上在一个目录中创建文件(用流写入它们),一旦完成,它们需要移动到不同的目录中。这个不同的目录由 Spark 流监控,当 Spark 流尝试处理未完成的文件时,我遇到了一些问题
回答by Sahil Desai
Try the following Scala code.
尝试以下 Scala 代码。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val srcPath = new Path(srcFilePath)
val destPath = new Path(destFilePath)
hdfs.copyFromLocalFile(srcPath, destPath)
You should also check if Spark has the HADOOP_CONF_DIR variable set in the conf/spark-env.sh file. This will make sure that Spark is going to find the Hadoop configuration settings.
您还应该检查 Spark 是否在 conf/spark-env.sh 文件中设置了 HADOOP_CONF_DIR 变量。这将确保 Spark 将找到 Hadoop 配置设置。
The dependencies for the build.sbt file:
build.sbt 文件的依赖项:
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
OR
或者
you can used IOUtils from apache commons to copy data from InputStream to OutputStream
您可以使用 apache commons 中的 IOUtils 将数据从 InputStream 复制到 OutputStream
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.commons.io.IOUtils;
val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);
//Create output stream to HDFS file
val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))
//Create input stream from local file
val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))
IOUtils.copy(inStream, outFileStream)
//Close both files
inStream.close()
outFileStream.close()
回答by Yordan Georgiev
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, FileUtil, Path}
val srcFileSystem: FileSystem = FileSystemUtil
.apply(spark.sparkContext.hadoopConfiguration)
.getFileSystem(sourceFile)
val dstFileSystem: FileSystem = FileSystemUtil
.apply(spark.sparkContext.hadoopConfiguration)
.getFileSystem(sourceFile)
FileUtil.copy(
srcFileSystem,
new Path(new URI(sourceFile)),
dstFileSystem,
new Path(new URI(targetFile)),
true,
spark.sparkContext.hadoopConfiguration)

