scala 按日期从 Spark 中的 S3 读取多个文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38657624/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Reading multiple files from S3 in Spark by date period
提问by V. Samma
Description
描述
I have an application, which sends data to AWS Kinesis Firehose and this writes the data into my S3 bucket. Firehose uses "yyyy/MM/dd/HH" format to write the files.
我有一个应用程序,它将数据发送到 AWS Kinesis Firehose 并将数据写入我的 S3 存储桶。Firehose 使用“yyyy/MM/dd/HH”格式写入文件。
Like in this sample S3 path:
就像在这个示例 S3 路径中一样:
s3://mybucket/2016/07/29/12
Now I have a Spark application written in Scala, where I need to read data from a specific time period. I have start and end dates. The data is in JSON format and that's why I use sqlContext.read.json()not sc.textFile().
现在我有一个用 Scala 编写的 Spark 应用程序,我需要在其中读取特定时间段的数据。我有开始和结束日期。数据采用 JSON 格式,这就是我使用sqlContext.read.json()not的原因sc.textFile()。
How can I read the data quickly and efficiently?
如何快速有效地读取数据?
What have I tried?
我试过什么?
Wildcards- I can select the data from all hours of a specific date or all dates of a specific month, for example:
val df = sqlContext.read.json("s3://mybucket/2016/07/29/*") val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")But if I have to read data from the date period of a few days, for example 2016-07-29 - 2016-07-30 I cannot use the wildcard approach in the same way.
Which brings me to my next point...
- Using multiple pathsor a CSV of directories as presented by samthebestin thissolution. It seems that separating directories with commas only works with
sc.textFile()and notsqlContext.read.json(). Union- A second solution from the previous link by cloudsuggests to read each directory separately and then union them together. Although he suggests unioning RDD-s, there's an option to union DataFrames as well. If I generate the date strings from given date period manually, then I may create a path that does not exist and instead of ignoring it, the whole reading fails. Instead I could use AWS SDK and use the function
listObjectsfrom AmazonS3Client to get all the keys like in iMKanchwala's solution from the previous link.The only problem is that my data is constantly changing. If
read.json()function gets all the data as a single parameter, it reads all the necessary data and is smart enough to infer the json schema from the data. If I read 2 directories separately and their schemas don't match, then I think unioning these two dataframes becomes a problem.Glob(?) syntax- Thissolution by nhahtdhis a little better than options 1and 2because they provide the option to specify dates and directories in more detail and as a single "path" so it works also with
read.json().But again, a familiar problem occurs about the missing directories. Let's say I want all the data from 20.07 to 30.07, I can declare it like this:
val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")But if I am missing data from let's say 25th of July, then the path
..16/07/25/does not exist and the whole function fails.
通配符- 我可以从特定日期的所有小时或特定月份的所有日期中选择数据,例如:
val df = sqlContext.read.json("s3://mybucket/2016/07/29/*") val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")但是如果我必须从几天的日期期间读取数据,例如 2016-07-29 - 2016-07-30 我不能以相同的方式使用通配符方法。
这让我想到了我的下一点......
- 使用samthebest在此解决方案中提供的多个路径或目录的 CSV 。似乎用逗号分隔目录只适用于
sc.textFile()而不适用于sqlContext.read.json(). 联合- 来自云上一个链接的第二个解决方案建议分别读取每个目录,然后将它们联合在一起。尽管他建议联合 RDD-s,但也可以选择联合 DataFrames。如果我手动从给定的日期期间生成日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取都会失败。相反,我可以使用 AWS SDK 并使用
listObjectsAmazonS3Client 中的函数从上一个链接中获取iMKanchwala解决方案中的所有密钥。唯一的问题是我的数据在不断变化。如果
read.json()函数将所有数据作为单个参数获取,它会读取所有必要的数据,并且足够智能以从数据中推断出 json 模式。如果我分别读取 2 个目录并且它们的模式不匹配,那么我认为合并这两个数据帧就会成为一个问题。Glob(?) 语法- nhahtdh 的这个解决方案比选项1和2好一点,因为它们提供了更详细地指定日期和目录的选项,并作为单个“路径”,因此它也适用于.
read.json()但是,关于丢失的目录又出现了一个熟悉的问题。假设我想要从 20.07 到 30.07 的所有数据,我可以这样声明:
val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")但是,如果我丢失了 7 月 25 日的数据,那么路径
..16/07/25/不存在并且整个函数失败。
And obviously it gets more difficult when the requested period is for example 25.11.2015-12.02.2016, then I would need to programmatically (in my Scala script) create a string path something like this:
显然,当请求的时间段是 25.11.2015-12.02.2016 时,它会变得更加困难,那么我需要以编程方式(在我的 Scala 脚本中)创建一个像这样的字符串路径:
"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"
And by creating it, I would neet to somehow be sure that these 25-30 and 01-12 intervals all have corresponding paths, if one is missing, it fails again. (Asterisk fortunately deals with missing directories, as it reads everything that exists)
通过创建它,我需要以某种方式确保这些 25-30 和 01-12 间隔都有相应的路径,如果缺少一个,它会再次失败。(幸运的是,Asterisk 处理丢失的目录,因为它会读取存在的所有内容)
How can I read all the necessary data from a single directory path all at once without the possibility of failing because of a missing directory between some date interval?
如何一次从单个目录路径中读取所有必要的数据,而不会因为某个日期间隔之间缺少目录而失败?
采纳答案by Sim
There is a much simpler solution. If you look at the DataFrameReader APIyou'll notice that there is a .json(paths: String*)method. Just build a collection of the paths you want, with globs of not, as you prefer, and then call the method, e.g.,
有一个更简单的解决方案。如果您查看DataFrameReader API,您会注意到有一个.json(paths: String*)方法。只需根据您的喜好构建您想要的路径集合,使用不带全局变量,然后调用该方法,例如,
val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)

