scala 火花流文件流
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/16560833/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
spark streaming fileStream
提问by user2384993
I'm programming with spark streaming but have some trouble with scala. I'm trying to use the function StreamingContext.fileStream
我正在使用 Spark 流进行编程,但在使用 Scala 时遇到了一些麻烦。我正在尝试使用函数 StreamingContext.fileStream
The definition of this function is like this:
这个函数的定义是这样的:
def fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassManifest[K], arg1: ClassManifest[V], arg2: ClassManifest[F]): DStream[(K, V)]
Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. File names starting with . are ignored. K Key type for reading HDFS file V Value type for reading HDFS file F Input format for reading HDFS file directory HDFS directory to monitor for new file
创建一个输入流,监控与 Hadoop 兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。开头的文件名。被忽略。K 读取 HDFS 文件的键类型 V 读取 HDFS 文件的值类型 F 读取 HDFS 文件目录的输入格式 HDFS 目录以监视新文件
I don't know how to pass the type of Key and Value. My Code in spark streaming:
我不知道如何传递 Key 和 Value 的类型。我在火花流中的代码:
val ssc = new StreamingContext(args(0), "StreamingReceiver", Seconds(1),
System.getenv("SPARK_HOME"), Seq("/home/mesos/StreamingReceiver.jar"))
// Create a NetworkInputDStream on target ip:port and count the
val lines = ssc.fileStream("/home/sequenceFile")
Java code to write the hadoop file:
编写hadoop文件的Java代码:
public class MyDriver {
private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door", "Five, six, pick up sticks",
"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
}
采纳答案by cmbaxter
If you want to use fileStream, you're going to have to supply all 3 type params to it when calling it. You need to know what your Key, Valueand InputFormattypes are before calling it. If your types were LongWritable, Textand TextInputFormat, you would call fileStreamlike so:
如果你想使用fileStream,你将不得不在调用它时提供所有 3 种类型的参数。在调用它之前Key,您需要知道您的,Value和InputFormat类型是什么。如果你的类型是LongWritable, Textand TextInputFormat,你会这样调用fileStream:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/sequenceFile")
If those 3 types do happen to be your types, then you might want to use textFileStreaminstead as it does not require any type params and delegates to fileStreamusing those 3 types I mentioned. Using that would look like this:
如果这 3 种类型恰好是您的类型,那么您可能想textFileStream改用它,因为它不需要任何类型参数和委托来fileStream使用我提到的这 3 种类型。使用它看起来像这样:
val lines = ssc.textFileStream("/home/sequenceFile")
回答by Vijay Krishna
val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis) true else false
return flag
}
}
val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_input",filterF,false).map(_._2.toString).map(u => u.split('\t'))

