Pyspark: Parse en kolonne af json-strenge

Jeg har en pyspark dataframe, der består af én kolonne, kaldes json, hvor hver række er en unicode-strenge af json. Jeg vil gerne fortolke hver række og returnerer en ny dataframe, hvor hver række er parset json.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

Jeg har prøvet kortlægning over hver række med json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

Men denne returnerer en TypeError: expected string or buffer

Jeg har mistanke om, at en del af problemet er, at ved konvertering fra en dataframe til en rdd, skemaet oplysninger er gået tabt, så jeg har også prøvet manuelt ind i skemaet info:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

Men jeg får det samme TypeError.

Kigger på dette svar, det ser ud til at udjævne rækker med flatMap kan være nyttigt her, men jeg er ikke at have succes med at enten:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

Jeg får denne fejl: AttributeError: 'unicode' object has no attribute 'get'.

OriginalForfatteren Steve | 2016-12-12

3 svar

  1. 21

    Konvertering af en dataframe med json-strenge for en struktureret dataframe er ‘ faktisk ganske enkel i spark, hvis du konverterer dataframe til FUD af strenge (se: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

    For eksempel:

    >>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
    >>> new_df.printSchema()
    root
     |-- body: struct (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- name: string (nullable = true)
     |    |-- sub_json: struct (nullable = true)
     |    |    |-- id: long (nullable = true)
     |    |    |-- sub_sub_json: struct (nullable = true)
     |    |    |    |-- col1: long (nullable = true)
     |    |    |    |-- col2: string (nullable = true)
     |-- header: struct (nullable = true)
     |    |-- foo: string (nullable = true)
     |    |-- id: long (nullable = true)
    Det er dejligt – Tak! Er der en måde at konvertere structtypes at maptypes? Senere i min kode, jeg er parsing ud hver maptype af explodeing kolonner.
    Ah jeg tror, jeg har regnet det ud: jeg kan undgå at bruge maptypes ved at gøre noget som dette: body = new_df.select('body').rdd.map(lambda r: r.body).toDF()
    Actally det er meget enklere: bare skriv new_df.select('body'), og du vil have dataframe med kroppen objekter.

    OriginalForfatteren Mariusz

  2. 19

    For Gnist 2.1+, kan du bruge from_json, som giver mulighed for bevarelse af andre ikke-json kolonner i dataframe som følger:

    from pyspark.sql.functions import from_json
    json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
    df.withColumn('json', from_json(col('json'), json_schema))

    Du lader Spark udlede skema af json string kolonne. Så df.json kolonne er ikke længere en StringType, men korrekt afkodet json struktur, dvs, indlejrede StrucType og alle de andre kolonner i df er bevaret, som det er.

    Du kan få adgang til json indhold som følger:

    df.select(col('json.header').alias('header'))
    Når jeg prøver det med streaming data frame (struktureret streaming), får jeg en fejl om, at Forespørgsler med streaming-kilder skal være udført med writeStream.start();;\nkafka. Kan du hjælpe mig med, hvordan jeg kan bruge JSON data fra kafka-streaming.
    Bare bruge en almindelig dataframe/fud til at udtrække json skema fra en batch – /stikprøve af data. Brug derefter den udpakkede skema i din streaming-app.

    OriginalForfatteren Martin Tapp

  3. 5

    Eksisterende svar ikke på arbejde, hvis din JSON er alt andet end perfekt/traditionelt formateret. For eksempel, den FUD-baseret skema inferens forventer JSON i curly-seler {} og vil give et forkert skema (hvilket resulterer i null værdier) hvis, for eksempel, er dine data ser sådan ud:

    [
      {
        "a": 1.0,
        "b": 1
      },
      {
        "a": 0.0,
        "b": 2
      }
    ]

    Jeg skrev en funktion til at omgå dette problem ved desinfektion JSON sådan at det lever i en anden JSON objekt:

    def parseJSONCols(df, *cols, sanitize=True):
        """Auto infer the schema of a json column and parse into a struct.
    
        rdd-based schema inference works if you have well-formatted JSON,
        like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
        string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
        can fix everything by wrapping the data in another JSON object
        (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
        automatically performs the wrapping and unwrapping.
    
        The schema inference is based on this
        `SO Post <https://stackoverflow.com/a/45880574)/>`_.
    
        Parameters
        ----------
        df : pyspark dataframe
            Dataframe containing the JSON cols.
        *cols : string(s)
            Names of the columns containing JSON.
        sanitize : boolean
            Flag indicating whether you'd like to sanitize your records
            by wrapping and unwrapping them in another JSON object layer.
    
        Returns
        -------
        pyspark dataframe
            A dataframe with the decoded columns.
        """
        res = df
        for i in cols:
    
            # sanitize if requested.
            if sanitize:
                res = (
                    res.withColumn(
                        i,
                        psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                    )
                )
            # infer schema and apply it
            schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
            res = res.withColumn(i, psf.from_json(psf.col(i), schema))
    
            # unpack the wrapped object if needed
            if sanitize:
                res = res.withColumn(i, psf.col(i).data)
        return res

    Bemærk: psf = pyspark.sql.functions.

    Du er en helt!!
    > For eksempel, FUD-baseret skema inferens forventer JSON i curly-seler, hvor har du læst dette? awesome finde!
    “hvor har du læst dette? “. Jeg kan ikke sige, at jeg læse det nogen steder, jeg har blot konstateret, at pyspark ikke parse min JSON, medmindre dette var sandt.

    OriginalForfatteren Nolan Conaway

Skriv et svar

Din e-mailadresse vil ikke blive publiceret. Krævede felter er markeret med *