Python 如何在 PySpark 中读取 Avro 文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29759893/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to read Avro file in PySpark
提问by B.Mr.W.
I am writing a spark job using python. However, I need to read in a whole bunch of avro files.
我正在使用 python 编写一个火花作业。但是,我需要读入一大堆 avro 文件。
Thisis the closest solution that I have found in Spark's example folder. However, you need to submit this python script using spark-submit. In the command line of spark-submit, you can specify the driver-class, in that case, all your avrokey, avrovalue class will be located.
这是我在 Spark 的示例文件夹中找到的最接近的解决方案。但是,您需要使用 spark-submit 提交此 python 脚本。在 spark-submit 的命令行中,您可以指定驱动程序类,在这种情况下,您所有的 avrokey、avrovalue 类都将被定位。
avro_rdd = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
In my case, I need to run everything within the Python script, I have tried to create an environment variable to include the jar file, finger cross Python will add the jar to the path but clearly it is not, it is giving me unexpected class error.
就我而言,我需要运行 Python 脚本中的所有内容,我尝试创建一个环境变量来包含 jar 文件,手指交叉 Python 会将 jar 添加到路径中,但显然不是,它给了我意想不到的类错误。
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
Can anyone help me how to read avro file in one python script?
谁能帮助我如何在一个 python 脚本中读取 avro 文件?
回答by zero323
Spark >= 2.4.0
火花 >= 2.4.0
You can use built-in Avro support. The API is backwards compatible with the spark-avro
package, with a few additions (most notably from_avro
/ to_avro
function).
您可以使用内置的 Avro 支持。API 与spark-avro
包向后兼容,并添加了一些功能(最显着的是from_avro
/to_avro
功能)。
Please note that module is not bundled with standard Spark binaries and has to be included using spark.jars.packages
or equivalent mechanism.
请注意,模块未与标准 Spark 二进制文件捆绑在一起,必须使用spark.jars.packages
或等效机制包含在内。
See also Pyspark 2.4.0, read avro from kafka with read stream - Python
另请参阅Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python
Spark < 2.4.0
火花 < 2.4.0
You can use spark-avro
library. First lets create an example dataset:
你可以使用spark-avro
图书馆。首先让我们创建一个示例数据集:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
Reading it using spark-csv
is as simple as this:
使用它阅读它spark-csv
就像这样简单:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+
回答by Régis B.
The former solution requires to install a third-party Java dependency, which is not something most Python devs are happy with. But you don't really need an external library if all you want to do is parse your Avro files with a given schema. You can just read the binary files and parse them with your favorite python Avro package.
前一种解决方案需要安装第三方 Java 依赖项,这不是大多数 Python 开发人员所满意的。但是,如果您只想用给定的架构解析 Avro 文件,那么您实际上并不需要外部库。您可以读取二进制文件并使用您最喜欢的 python Avro 包解析它们。
For instance, this is how you can load Avro files using fastavro
:
例如,您可以使用fastavro
以下方法加载 Avro 文件:
from io import BytesIO
import fastavro
schema = {
...
}
rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
.flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))
print(rdd.collect())
回答by Vignesh Sundar
For Spark < 2.4.0, PySpark can create the dataframe by reading the avro file and its respective schema(.avsc) without any external python module by using the JAR "com.databricks.spark.avro" and python's "subprocess" module
对于 Spark < 2.4.0,PySpark 可以通过使用 JAR“com.databricks.spark.avro”和 python 的“subprocess”模块,通过读取 avro 文件及其各自的架构(.avsc)来创建数据帧,而无需任何外部 python 模块
Below is the solution:
下面是解决方案:
avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro
#use subprocess module
import subproccess as SP
load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()
avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)