2

I'm trying to create a JSON structure from a pyspark dataframe. I have below columns in my dataframe - batch_id, batch_run_id, table_name, column_name, column_datatype, last_refresh_time, refresh_frequency, owner

I want it in below JSON structure -

{
"GeneralInfo": {
    "DataSetID": "xxxx1234Abcsd", 
    "Owner" : ["[email protected]", "[email protected]", "[email protected]"]
    "Description": "", 
    "BuisnessFunction": "", 
    "Source": "", 
    "RefreshRate": "Weekly",
    "LastUpdate": "2020/10/15", 
    "InfoSource": "TemplateInfo"
  },
  "Tables": [
    {
      "TableName": "Employee",
      "Columns" : [
               { "ColumnName" : "EmployeeID",
                  "ColumnDataType": "int"
               },
               { "ColumnName" : "EmployeeName",
                  "ColumnDataType": "string"
               }
            ]
    }
   }
}

I'm trying to assign the values in JSON string through dataframe column indexes but it is giving me an error as "Object of Type Column is not JSON serializable". I have used like below -

{
"GeneralInfo": {
    "DataSetID": df["batch_id"], 
    "Owner" : list(df["owner"])
    "Description": "", 
    "BuisnessFunction": "", 
    "Source": "", 
    "RefreshRate": df["refresh_frequency"],
    "LastUpdate": df["last_update_time"], 
    "InfoSource": "TemplateInfo"
  },
  "Tables": [
    {
      "TableName": df["table_name"],
      "Columns" : [
               { "ColumnName" : df["table_name"]["column_name"],
                  "ColumnDataType": df["table_name"]["column_datatype"]
               } 
     
            ]
     }
  }
}

Sample Data - Sample Data

Please help me on this, I have newly started coding in Pyspark.

4
  • Could you please paste sample dataframe? Commented Aug 10, 2021 at 7:52
  • @MohanaBC I've updated my post, please check. Thanks Commented Aug 10, 2021 at 8:11
  • @MohanaBC - Did you check? Commented Aug 10, 2021 at 13:31
  • Yeah, But Not getting single JSON element as you expected. Commented Aug 10, 2021 at 14:03

1 Answer 1

4

Tried getting JSON format from the sample data which you provided, output format is not matching exactly as you expected. You can improvise the below code further.

We can use toJSON function to convert dataframe to JSON format. Before calling toJSON function we need to use array(), struct functions by passing required columns to match JSON format as required.

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').getOrCreate()

in_values = [
    (123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',
     ['[email protected]', '[email protected]', '[email protected]']),
    (123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',
     ['[email protected]', '[email protected]', '[email protected]'])
]

cols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",
        "last_update_time", "refresh_frequency", "Owner"]


df = spark.createDataFrame(in_values).toDF(*cols)\
    .selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")

list1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),
         df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]

list2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),
         df["column_datatype"].alias("ColumnDataType")]

df.groupBy("batch_id") \
    .agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),
         collect_list(struct(*list2)).alias("Tables")).drop("batch_id") \
    .toJSON().foreach(print)

# outputs JSON --->
    '''
     {
   "GeneralInfo":{
         "DataSetID":123,
         "Owner":[
            "[email protected]",
            "[email protected]",
            "[email protected]"
         ],
         "RefreshRate":"Weekly",
         "LastUpdate":"21/05/15",
         "Description":"",
         "BusinessFunction":"",
         "InfoSource":"TemplateInfo",
         "Source":""
      },
   "Tables":[
      {
         "TableName":"Employee",
         "ColumnName":"Employee_id",
         "ColumnDataType":"int"
      },
      {
         "TableName":"Employee",
         "ColumnName":"Employee_name",
         "ColumnDataType":"string"
      }
   ]
}
'''
  
Sign up to request clarification or add additional context in comments.

8 Comments

Thanks for sharing this. I see that you've hardcoded the column names to create the lists. Any dynamic way to handle this rather than hardcoding?
Also, { "GeneralInfo": { "DataSetID": "xxxx1234Abcsd", "Owner" : ["[email protected]", "[email protected]", "[email protected]"] "Description": "", "BuisnessFunction": "", "Source": "", "RefreshRate": "Weekly", "LastUpdate": "2020/10/15", "InfoSource": "TemplateInfo" } would be under {} not []
We can dynamically generate list using generators, we need to consider column renaming also here as your json format has different names.
from [] i.e array to {} i.e struct, we can change that. Let me know if you are okay with second part "Tables":[....] ?
You can try using when and otherwise function.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.