Python Pyspark:解析一列json字符串

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41107835/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 00:26:40  来源:igfitidea点击:

Pyspark: Parse a column of json strings

pythonjsonapache-sparkpyspark

提问by Steve

I have a pyspark dataframe consisting of one column, called json, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.

我有一个 pyspark 数据框,由一列组成,称为json,其中每一行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据帧,其中每一行都是解析后的 json。

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

I've tried mapping over each row with json.loads:

我试过在每一行上映射json.loads

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

But this returns a TypeError: expected string or buffer

但这会返回一个 TypeError: expected string or buffer

I suspect that part of the problem is that when converting from a dataframeto an rdd, the schema information is lost, so I've also tried manually entering in the schema info:

我怀疑问题的一部分是从 a 转换dataframe为 an 时rdd,架构信息丢失了,所以我也尝试手动输入架构信息:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

But I get the same TypeError.

但我得到同样的TypeError

Looking at this answer, it looks like flattening out the rows with flatMapmight be useful here, but I'm not having success with that either:

看看这个答案,看起来扁平化行在flatMap这里可能很有用,但我也没有成功:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

I get this error: AttributeError: 'unicode' object has no attribute 'get'.

我收到此错误:AttributeError: 'unicode' object has no attribute 'get'

回答by Martin Tapp

For Spark 2.1+, you can use from_jsonwhich allows the preservation of the other non-json columns within the dataframe as follows:

对于Spark 2.1+,您可以使用from_json它允许在数据帧中保留其他非 json 列,如下所示:

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

You let Spark derive the schema of the json string column. Then the df.jsoncolumn is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucTypeand all the other columns of dfare preserved as-is.

您让 Spark 派生 json 字符串列的架构。然后该df.json列不再是 StringType,而是正确解码的 json 结构,即嵌套StrucType且所有其他列df均按原样保留。

You can access the json content as follows:

您可以按如下方式访问json内容:

df.select(col('json.header').alias('header'))

回答by Mariusz

Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

如果您之前将数据帧转换为字符串的 RDD(请参阅:http://spark.apache.org/docs/latest/sql-programming-guide 。 html#json-数据集)

For example:

例如:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

回答by Nolan Conaway

Existing answers do not work if your JSON is anything but perfectly/traditionally formatted. For example, the RDD-based schema inference expects JSON in curly-braces {}and will provide an incorrect schema (resulting in nullvalues) if, for example, your data looks like:

如果您的 JSON 不是完美/传统格式,则现有答案不起作用。例如,基于 RDD 的模式推断需要花括号中的 JSON{}并且会提供不正确的模式(导致null值),例如,如果您的数据如下所示:

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

I wrote a function to work around this issue by sanitizing JSON such that it lives in another JSON object:

我编写了一个函数来解决这个问题,通过清理 JSON 使其存在于另一个 JSON 对象中:

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

Note: psf= pyspark.sql.functions.

注:psf= pyspark.sql.functions

回答by Buthetleon

Here's a concise (spark SQL) version of @nolan-conaway's parseJSONColsfunction.

这是@nolan-conawayparseJSONCols函数的简洁(spark SQL)版本。

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;

PS. I've added the explode function as well :P

附注。我还添加了爆炸功能:P

You'll need to know some HIVE SQL types

您需要了解一些HIVE SQL 类型