scala 使用模式将带有 Spark 的 AVRO 消息转换为 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39049648/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:34:11  来源:igfitidea点击:

Use schema to convert AVRO messages with Spark to DataFrame

scalaapache-sparkapache-kafkaspark-streamingavro

提问by Sascha Vetter

Is there a way to use a schema to convert avromessages from kafkawith sparkto dataframe? The schema file for user records:

有没有办法使用模式将来自带有spark 的kafka 的avro消息转换为数据帧?用户记录的架构文件:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}

And code snippets from SqlNetworkWordCount exampleand Kafka, Spark and Avro - Part 3, Producing and consuming Avro messagesto read in messages.

以及来自SqlNetworkWordCount 示例Kafka、Spark 和 Avro - 第 3 部分,生成和使用 Avro 消息以读取消息的代码片段。

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2and Kafka 0.10.

不知何故,除了使用 case 类将 AVRO 消息转换为 DataFrame 之外,我找不到其他方法。有没有可能改用模式?我正在使用Spark 1.6.2Kafka 0.10

The complete code, in case you're interested.

完整的代码,如果你有兴趣。

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

采纳答案by Tal Joffe

OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.

OP 可能解决了这个问题,但为了将来参考,我很普遍地解决了这个问题,所以认为在这里发帖可能会有所帮助。

So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:

所以一般来说,你应该将 Avro 模式转换为 spark StructType 并将你在 RDD 中的对象转换为 Row[Any] 然后使用:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>

In order to convert the Avro schema I used spark-avrolike so:

为了转换 Avro 模式,我像这样使用了spark-avro

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:

RDD 的转换更加棘手。

rdd.map(obj=>{
    val seq = (obj.getName(),obj.getAge()
    Row.fromSeq(seq))
    })

In this example the object has 2 fields name and age.

在这个例子中,对象有 2 个字段 name 和 age。

The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.

重要的是要确保 Row 中的元素与之前 StructType 中字段的顺序和类型相匹配。

In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.

在我的特定情况下,我有一个更复杂的对象,我想对其进行一般处理以支持未来的架构更改,因此我的代码要复杂得多。

the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)

OP 建议的方法也应该适用于某些案例,但很难暗示复杂对象(不是原始对象或案例类)

another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:

另一个提示是,如果您在一个类中有一个类,您应该将该类转换为 Row,以便包装类将转换为类似的内容:

Row(Any,Any,Any,Row,...)

you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself

您还可以查看我之前提到的关于如何将对象转换为行的 spark-avro 项目。我自己在那里使用了一些逻辑

If someone reading this needs further help ask me in the comments and I'll try to help

如果有人阅读本文需要进一步帮助,请在评论中问我,我会尽力提供帮助

Similar problem is solved also here.

类似的问题也在这里解决。

回答by Sambit Tripathy

Please take a look at this https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala

请看一下这个 https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala

So instead of

所以代替

 val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF()

you can try this

你可以试试这个

 val df = spark.read.avro(message._2.get)

回答by RadioLog

I worked on the similar issue, but in Java. So not sure about Scala, but take a look at the library com.databricks.spark.avro.

我处理过类似的问题,但使用的是 Java。所以不确定 Scala,但看看库com.databricks.spark.avro

回答by Ben

For anyone interested in handling this in a way that can handle schema changes without needing to stop and redeploy your spark application (assuming your app logic can handle this) see this question/answer.

对于有兴趣以可以处理架构更改的方式处理此问题的任何人,无需停止和重新部署您的 Spark 应用程序(假设您的应用程序逻辑可以处理此问题),请参阅此问题/答案