scala Spark:写入 Avro 文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20612571/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 05:56:19  来源:igfitidea点击:

Spark: Writing to Avro file

scalaserializationavroapache-spark

提问by user1013725

I am in Spark, I have an RDD from an Avro file. I now want to do some transformations on that RDD and save it back as an Avro file:

我在 Spark 中,我有一个来自 Avro 文件的 RDD。我现在想对该 RDD 进行一些转换并将其另存为 Avro 文件:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

When running this Spark complains that Schema$recordSchema is not serializable.

运行此 Spark 时,会抱怨 Schema$recordSchema 不可序列化。

If I uncomment the .map call (and just have rdd.saveAsNewAPIHadoopFile), the call succeeds.

如果我取消注释 .map 调用(并且只有 rdd.saveAsNewAPIHadoopFile),调用就会成功。

What am I doing wrong here?

我在这里做错了什么?

Any idea?

任何的想法?

回答by Nicola Ferraro

The issue here is related to the non-serializability of the avro.Schema class used in the Job. The exception is thrown when you try to reference the schema object from the code inside the map function.

这里的问题与 Job 中使用的 avro.Schema 类的不可序列化有关。当您尝试从 map 函数内的代码引用架构对象时,将引发异常。

For instance, if you try to do as follows, you will get the "Task not serializable"exception:

例如,如果您尝试执行以下操作,您将收到“Task not serializable”异常:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
})

You can make everything to work by just creating a new instance of the schema inside the function block:

您只需在功能块内创建架构的新实例即可使一切正常工作:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
})

Since you would not like parsing the avro schema for every record you handle, a better solution will be to parse the schema at partition level. The following also works:

由于您不希望为您处理的每条记录解析 avro 架构,因此更好的解决方案是在分区级别解析架构。以下也有效:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => {
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  })
})

The code above works as long as you provide a portable reference to the jsonSchema file, since the map function is going to be executed by multiple remote executors. It can be a reference to a file in HDFS or it can be packaged along with the application in the JAR (you will use the class-loader functions to get its contents in the latter case).

只要您提供对 jsonSchema 文件的可移植引用,上面的代码就可以工作,因为 map 函数将由多个远程执行程序执行。它可以是对 HDFS 中文件的引用,也可以与 JAR 中的应用程序一起打包(在后一种情况下,您将使用类加载器函数来获取其内容)。

For those who are trying to use Avro with Spark, notice that there are still some unresolved compilation problems and you have to use the following import on Maven POM:

对于那些尝试将 Avro 与 Spark 一起使用的人,请注意仍有一些未解决的编译问题,您必须在 Maven POM 上使用以下导入:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

Note the "hadoop2"classifier. You can track the issue at https://issues.apache.org/jira/browse/SPARK-3039.

注意"hadoop2"分类器。您可以在https://issues.apache.org/jira/browse/SPARK-3039跟踪问题。

回答by Gwen Shapira

The default serializer used by Spark is Java serialization. So for all java types it will try to serialize using Java serialization. AvroKey is not serializable, so you are getting errors.

Spark 使用的默认序列化器是 Java 序列化。因此,对于所有 java 类型,它将尝试使用 Java 序列化进行序列化。AvroKey 不可序列化,因此您会收到错误消息。

You can use KryoSerializer, or plugin in your custom serialization (like Avro). You can read more about serialization here. http://spark-project.org/docs/latest/tuning.html

您可以在自定义序列化中使用 KryoSerializer 或插件(如 Avro)。您可以在此处阅读有关序列化的更多信息。http://spark-project.org/docs/latest/tuning.html

You can also wrap your object by something that is externalizable. Check out for example the SparkFlumeEvent that wraps AvroFlumeEvent here: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

您还可以使用可外部化的东西来包装您的对象。例如,在此处查看包装 AvroFlumeEvent 的 SparkFlumeEvent:https: //github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

回答by Sagar balai

With dataframe it is very simple to create avro using databrics library.

使用 dataframe 使用 databrics 库创建 avro 非常简单。

dataframe.write.format("com.databricks.spark.avro").avro($hdfs_path)

dataframe.write.format("com.databricks.spark.avro").avro($hdfs_path)

In your case, input is avro so it will have schema associated with it so you can directly read avro into dataframe and after your transformation you can write into avro using above code.

在您的情况下,输入是 avro,因此它将具有与之关联的架构,因此您可以直接将 avro 读入数据帧,并且在转换后您可以使用上述代码写入 avro。

To read avro into dataframe :

要将 avro 读入数据帧:

Spark 1.6

火花1.6

val dataframe =sqlContext.read.avro($hdfs_path) OR val dataframe = sqlContext.read.format("com.databricks.spark.avro").load($hdfs_path)

val dataframe =sqlContext.read.avro($hdfs_path) 或 val dataframe = sqlContext.read.format("com.databricks.spark.avro").load($hdfs_path)

Spark 2.1

火花2.1

val dataframe =sparkSession.read.avro($hdfs_path) OR val dataframe = sparkSession.read.format("com.databricks.spark.avro").load($hdfs_path)

val dataframe =sparkSession.read.avro($hdfs_path) 或 val dataframe = sparkSession.read.format("com.databricks.spark.avro").load($hdfs_path)