Python 如何将 Spark Streaming 数据转换为 Spark DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35245648/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 16:10:48  来源:igfitidea点击:

How to convert Spark Streaming data into Spark DataFrame

pythonpysparkspark-streaming

提问by Cherry Wu

So far, Spark hasn't created the DataFrame for streaming data, but when I am doing anomalies detection, it is more convenient and faster to use DataFrame for data analysis. I have done this part, but when I try to do real time anomalies detection using streaming data, the problems appeared. I tried several ways and still could not convert DStream to DataFrame, and cannot convert the RDD inside of DStream into DataFrame either.

到目前为止,Spark还没有为流式数据创建DataFrame,但是当我在做异常检测时,使用DataFrame进行数据分析会更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了。我尝试了几种方法,仍然无法将DStream转换为DataFrame,也无法将DStream内部的RDD转换为DataFrame。

Here's part of my latest version of the code:

这是我最新版本的代码的一部分:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

As you can see in the main() function, when I am reading the input streaming data using ssc.socketTextStream() method, it generates DStream, then I tried to convert each individual in DStream into Row, hoping I could convert the data into DataFrame later.

在 main() 函数中可以看到,当我使用 ssc.socketTextStream() 方法读取输入流数据时,它会生成 DStream,然后我尝试将 DStream 中的每个个体转换为 Row,希望可以将数据转换为DataFrame 稍后。

If I use ppprint() to print out features_rdd here, it works, which makes me think, each individual in features_rdd is a batch of RDD while the whole features_rdd is a DStream.

如果我在这里使用 ppprint() 来打印 features_rdd,它会起作用,这让我想到, features_rdd 中的每个人都是一批 RDD,而整个 features_rdd 是一个 DStream。

Then I created streamrdd_to_df() method and hoped to convert each batch of RDD into dataframe, it gives me the error, showing:

然后我创建了 streamrdd_to_df() 方法并希望将每批 RDD 转换为数据帧,它给了我错误,显示:

ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

错误 StreamingContext:启动上下文时出错,将其标记为已停止 java.lang.IllegalArgumentException:要求失败:未注册输出操作,因此无需执行任何操作

Is there any thought about how can I do DataFrame operations on Spark streaming data?

有没有想过如何对 Spark 流数据进行 DataFrame 操作?

采纳答案by Cherry Wu

After 1 year, I started to explore Spark 2.0 streaming methods and finally solved my anomalies detection problem. Here's my code in IPython, you can also find how does my raw data input look like

1年后,我开始探索Spark 2.0的流式方法,终于解决了我的异常检测问题。这是我在 IPython 中的代码,您还可以找到我的原始数据输入的样子

回答by Sumit

Read the Error carefully..It says there is No output operations registered. Spark is Lazy and executes the job/ cod only when it has something to produce as a result. In your program there is no "Output Operation" and same is being complained by Spark.

仔细阅读错误..它说没有注册输出操作。Spark 是懒惰的,只有在它有结果时才执行作业/代码。在您的程序中没有“输出操作”,Spark 也抱怨同样的问题。

Define a foreach() or Raw SQL Query over the DataFrame and then print the results. It will work fine.

在 DataFrame 上定义 foreach() 或原始 SQL 查询,然后打印结果。它会正常工作。

回答by user3698581

Spark has provided us with structured streamingwhich can solve such problems. It can generate streaming DataFrame i.e DataFrames being appended continuously. Please check below link

Spark 为我们提供了可以解决此类问题的结构化流。它可以生成流数据帧,即连续附加的数据帧。请检查以下链接

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

回答by Ahmed Abdelrahman

Why don't you use something like this:

你为什么不使用这样的东西:

def socket_streamer(sc): # retruns a streamed dataframe
    streamer = session.readStream\
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()
    return streamer

The output itself of this function above (or the readStreamin general) is a DataFrame. There you don't need to worry about df, it is already automatically created by spark. See the Spark Structured Streaming Programming Guide

上面(或readStream一般情况下)此函数的输出本身是一个 DataFrame。在那里你不需要担心 df,它已经由 spark 自动创建了。请参阅Spark 结构化流编程指南

回答by Mor Shemesh

With Spark 2.3 / Python 3 / Scala 2.11 (Using databricks) I was able to use temporary tables and a code snippet in scala (using magic in notebooks):

使用 Spark 2.3 / Python 3 / Scala 2.11(使用数据块),我能够在 Scala 中使用临时表和代码片段(在笔记本中使用魔法):

Python Part:

蟒蛇部分:

ddf.createOrReplaceTempView("TempItems")

Then on a new cell:

然后在一个新单元格上:

%scala
import java.sql.DriverManager
import org.apache.spark.sql.ForeachWriter

// Create the query to be persisted...
val tempItemsDF = spark.sql("SELECT field1, field2, field3 FROM TempItems")

val itemsQuery = tempItemsDF.writeStream.foreach(new ForeachWriter[Row] 
{      
  def open(partitionId: Long, version: Long):Boolean = {
    // Initializing DB connection / etc...
  }

  def process(value: Row): Unit = {
    val field1 = value(0)
    val field2 = value(1)
    val field3 = value(2)

    // Processing values ...
  }

  def close(errorOrNull: Throwable): Unit = {
    // Closing connections etc...
  }
})

val streamingQuery = itemsQuery.start()

回答by Thilak

There is no need to convert DStream into RDD. By definition DStream is a collection of RDD. Just use DStream's method foreach() to loop over each RDD and take action.

无需将 DStream 转换为 RDD。根据定义,DStream 是 RDD 的集合。只需使用 DStream 的方法 foreach() 来循环每个 RDD 并采取行动。

val conf = new SparkConf()
  .setAppName("Sample")
val spark = SparkSession.builder.config(conf).getOrCreate()
sampleStream.foreachRDD(rdd => {
    val sampleDataFrame = spark.read.json(rdd)
}