scala S3 目录上的 Spark Streaming
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30994401/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Streaming on a S3 Directory
提问by Brandon
So I have thousands of events being streamed through Amazon Kinesis into SQS then dumped into a S3 directory. About every 10 minutes, a new text file is created to dump the data from Kinesis into S3. I would like to set up Spark Streaming so that it streams the new files being dumped into S3. Right now I have
因此,我有数千个事件通过 Amazon Kinesis 流式传输到 SQS,然后转储到 S3 目录中。大约每 10 分钟,就会创建一个新的文本文件,将 Kinesis 中的数据转储到 S3 中。我想设置 Spark Streaming,以便它将转储到 S3 的新文件流式传输。现在我有
import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()
However, Spark Streaming is not picking up the new files being dumped into S3. I think it has something to do with the file write requirements:
但是,Spark Streaming 不会接收转储到 S3 中的新文件。我认为这与文件写入要求有关:
The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
Why is Spark streaming not picking up the new files? Is it because AWS is creating the files in the directory and not moving them? How can I make sure Spark picks up the files being dumped into S3?
为什么 Spark 流不能接收新文件?是因为 AWS 在目录中创建文件而不是移动它们吗?我如何确保 Spark 接收到正在转储到 S3 中的文件?
回答by Hafiz Mujadid
In order to stream an S3 bucket. you need to provide the path to S3 bucket. And it will stream all data from all the files in this bucket. Then whenever w new file is created in this bucket, it will be streamed. If you are appending data to existing file which are read before, these new updates will not be read.
为了流式传输 S3 存储桶。您需要提供 S3 存储桶的路径。它将从该存储桶中的所有文件中流式传输所有数据。然后每当在此存储桶中创建 w 新文件时,它将被流式传输。如果您将数据附加到之前读取的现有文件,则不会读取这些新更新。
here is small piece of code that works
这是一小段有效的代码
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)
val ssc = new org.apache.spark.streaming.StreamingContext(
sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
hope it will help.
希望它会有所帮助。

