1

I'm trying to get nested json values in a pyspark dataframe. I have easily solved this using pandas, but now I'm trying to get it working with just pyspark functions.

print(response)
{'ResponseMetadata': {'RequestId': 'PGMCTZNAPV677CWE', 'HostId': '/8qweqweEfpdegFSNU/hfqweqweqweSHtM=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '/8yacqweqwe/hfjuSwKXDv3qweqweqweHtM=', 'x-amz-request-id': 'PqweqweqweE', 'date': 'Fri, 09 Sep 2022 09:25:04 GMT', 'x-amz-bucket-region': 'eu-central-1', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Contents': [{'Key': 'qweqweIntraday.csv', 'LastModified': datetime.datetime(2022, 7, 12, 8, 32, 10, tzinfo=tzutc()), 'ETag': '"qweqweqwe4"', 'Size': 1165, 'StorageClass': 'STANDARD'}], 'Name': 'test-bucket', 'Prefix': '', 'MaxKeys': 1000, 'EncodingType': 'url', 'KeyCount': 1}

With pandas I can parse this input into a dataframe with the following code:

object_df = pd.DataFrame()
for elem in response:
    if 'Contents' in elem:
        object_df = pd.json_normalize(response['Contents'])


print(object_df)
                               Key              LastModified  \
0  202207110000_qweIntraday.csv 2022-07-12 08:32:10+00:00   

                                 ETag  Size StorageClass  
0  "fqweqweqwee0cb4"  1165     STANDARD

(there are sometimes multiple "Contents", so I have to use recursion).

This was my attempt to replicate this with spark dataframe, and sc.parallelize:

object_df = spark.sparkContext.emptyRDD()
for elem in response:
    if 'Contents' in elem:
        rddjson = spark.read.json(sc.parallelize([response['Contents']]))

Also tried:

sqlc = SQLContext(sc)
rddjson = spark.read.json(sc.parallelize([response['Contents']]))
df = sqlc.read.json("multiline", "true").json(rddjson)

df.show()
+--------------------+
|     _corrupt_record|
+--------------------+
|[{'Key': '2/3c6a6...|
+--------------------+

This is not working. I already saw some related posts, saying that I can use explode like in this example (stackoverflow answer) instead of json_normalize, but i'm having trouble replicating the example.

Any suggestion how I can solve this with pyspark or pyspark.sql (and not adding additional libraries) is very welcome.

1 Answer 1

1

It looks like the issue is with the data containing a python datetime object (in the LastModified field).

One way around this might be (assuming your ok with python standard libraries):

import json

sc = spark.sparkContext
for elem in response:
    if 'Contents' in elem:        
        json_str = json.dumps(response['Contents'], default=str)
        object_df = spark.read.json(sc.parallelize([json_str]))
Sign up to request clarification or add additional context in comments.

1 Comment

Yes, this is a good workaround. Thank you

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.