Scala Apache Spark中DStream的输出内容
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29009870/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Output contents of DStream in Scala Apache Spark
提问by blue-sky
Below Spark code does not appear to perform any operation on file example.txt
下面的 Spark 代码似乎没有对文件执行任何操作 example.txt
val conf = new org.apache.spark.SparkConf()
.setMaster("local")
.setAppName("filter")
.setSparkHome("C:\spark\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("C:\example.txt")
dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
I'm attempting to print first 10 elements of file using dataFile.print()
我正在尝试使用打印文件的前 10 个元素 dataFile.print()
Some of generated output :
一些生成的输出:
15/03/12 12:23:53 INFO JobScheduler: Started JobScheduler
15/03/12 12:23:54 INFO FileInputDStream: Finding new files took 105 ms
15/03/12 12:23:54 INFO FileInputDStream: New files at time 1426163034000 ms:
15/03/12 12:23:54 INFO JobScheduler: Added jobs for time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Starting job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
-------------------------------------------
Time: 1426163034000 ms
-------------------------------------------
15/03/12 12:23:54 INFO JobScheduler: Finished job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Total delay: 0.157 s for time 1426163034000 ms (execution: 0.006 s)
15/03/12 12:23:54 INFO FileInputDStream: Cleared 0 old files that were older than 1426162974000 ms:
15/03/12 12:23:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:55 INFO FileInputDStream: Finding new files took 2 ms
15/03/12 12:23:55 INFO FileInputDStream: New files at time 1426163035000 ms:
15/03/12 12:23:55 INFO JobScheduler: Added jobs for time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Starting job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
-------------------------------------------
Time: 1426163035000 ms
-------------------------------------------
15/03/12 12:23:55 INFO JobScheduler: Finished job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Total delay: 0.011 s for time 1426163035000 ms (execution: 0.001 s)
15/03/12 12:23:55 INFO MappedRDD: Removing RDD 1 from persistence list
15/03/12 12:23:55 INFO BlockManager: Removing RDD 1
15/03/12 12:23:55 INFO FileInputDStream: Cleared 0 old files that were older than 1426162975000 ms:
15/03/12 12:23:55 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:56 INFO FileInputDStream: Finding new files took 3 ms
15/03/12 12:23:56 INFO FileInputDStream: New files at time 1426163036000 ms:
example.txtis of format :
example.txt格式为:
gdaeicjdcg,194,155,98,107
jhbcfbdigg,73,20,122,172
ahdjfgccgd,28,47,40,178
afeidjjcef,105,164,37,53
afeiccfdeg,29,197,128,85
aegddbbcii,58,126,89,28
fjfdbfaeid,80,89,180,82
As the printdocumentation states :
正如print文档所述:
/** * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */
/** * 打印此 DStream 中生成的每个 RDD 的前十个元素。这是一个输出 * 操作符,所以这个 DStream 将被注册为一个输出流并在那里具体化。*/
Does this mean 0 RDD have been generated for this stream ? Using Apache Spark if want to see contents of RDD would use collect function of RDD. Is these similar method for Streams ? So in short how to print to console contents of Stream ?
这是否意味着为此流生成了 0 个 RDD?如果想查看 RDD 的内容,则使用 Apache Spark 将使用 RDD 的收集功能。这些是 Streams 的类似方法吗?那么简而言之如何打印到 Stream 的控制台内容?
Update :
更新 :
Updated code based on @0x0FFF comment. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.htmldoes not appear to give an example reading from local file system. Is this not as common as using Spark core, where there are explicit examples for reading data from file?
根据@0x0FFF 注释更新代码。http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html似乎没有给出从本地文件系统读取的示例。这不像使用 Spark 核心那么常见,那里有从文件读取数据的显式示例?
Here is updated code :
这是更新的代码:
val conf = new org.apache.spark.SparkConf()
.setMaster("local[2]")
.setAppName("filter")
.setSparkHome("C:\spark\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("file:///c:/data/")
dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
But output is same. When I add new files to c:\\datadir (which have same format as existing data files) they are not processed. I assume dataFile.printshould print first 10 lines to console ?
但是输出是一样的。当我将新文件添加到c:\\datadir(与现有数据文件具有相同的格式)时,它们不会被处理。我假设dataFile.print应该打印前 10 行到控制台?
Update 2 :
更新2:
Perhaps this is related to fact that I'm running this code in Windows environment?
也许这与我在 Windows 环境中运行此代码的事实有关?
采纳答案by 0x0FFF
You misunderstood the use of textFileStream. Here is its description from Spark documentation:
你误解了textFileStream. 以下是 Spark 文档中的描述:
Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat).
创建一个输入流来监控与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取(使用密钥作为 LongWritable,使用值作为文本和输入格式作为 TextInputFormat)。
So first, you should pass it the directory, and second, this directory should be available from the node running the receiver, so it is better to use HDFS for this purpose. Then when you put a newfile into this directory, it would be processed by the function print()and first 10 lines would be printed for it
因此,首先,您应该将目录传递给它,其次,运行接收器的节点应该可以使用该目录,因此最好为此目的使用 HDFS。然后当你把一个新文件放入这个目录时,它会被函数处理print()并打印前 10 行
Update:
更新:
My code:
我的代码:
[alex@sparkdemo tmp]$ pyspark --master local[2]
Python 2.6.6 (r266:84292, Nov 22 2013, 12:16:22)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
s15/03/12 06:37:49 WARN Utils: Your hostname, sparkdemo resolves to a loopback address: 127.0.0.1; using 192.168.208.133 instead (on interface eth0)
15/03/12 06:37:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Python version 2.6.6 (r266:84292, Nov 22 2013 12:16:22)
SparkContext available as sc.
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 30)
>>> dataFile = ssc.textFileStream('file:///tmp')
>>> dataFile.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()
-------------------------------------------
Time: 2015-03-12 06:40:30
-------------------------------------------
-------------------------------------------
Time: 2015-03-12 06:41:00
-------------------------------------------
-------------------------------------------
Time: 2015-03-12 06:41:30
-------------------------------------------
1 2 3
4 5 6
7 8 9
-------------------------------------------
Time: 2015-03-12 06:42:00
-------------------------------------------
回答by blue-sky
Here is a custom receiver I wrote that listens for data at a specified dir :
这是我编写的一个自定义接收器,用于侦听指定目录中的数据:
package receivers
import java.io.File
import org.apache.spark.{ SparkConf, Logging }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.receiver.Receiver
class CustomReceiver(dir: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("File Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
def recursiveListFiles(f: File): Array[File] = {
val these = f.listFiles
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
}
private def receive() {
for (f <- recursiveListFiles(new File(dir))) {
val source = scala.io.Source.fromFile(f)
val lines = source.getLines
store(lines)
source.close()
logInfo("Stopped receiving")
restart("Trying to connect again")
}
}
}
One thing I think to be aware of is that the the files need to be processed in a time that is <= configured batchDuration. In example below it's set to 10 seconds but if time to process files by receiver exceeds 10 seconds then some data files will not be processed. I'm open to correction on this point.
我认为需要注意的一件事是文件需要在 <= configure 的时间内处理batchDuration。在下面的示例中,它设置为 10 秒,但如果接收器处理文件的时间超过 10 秒,则将不会处理某些数据文件。在这一点上,我愿意纠正。
Here is how the custom receiver is implemented :
以下是自定义接收器的实现方式:
val conf = new org.apache.spark.SparkConf()
.setMaster("local[2]")
.setAppName("filter")
.setSparkHome("C:\spark\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(10))
val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("C:\data\"))
customReceiverStream.print
customReceiverStream.foreachRDD(m => {
println("size is " + m.collect.size)
})
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
More info at : http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html& https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html
更多信息请访问:http: //spark.apache.org/docs/1.2.0/streaming-programming-guide.html& https://spark.apache.org/docs/1.2.0/streaming-custom-receivers。 html

