0

This is my goal: I try to analyze the json files created by Microsoft's Azure Data Factory. I want to convert them into a set of relational tables.

To explain my problem, I have tried to create a sample with reduced complexity. You can produce two sample records with below python code:

sample1 = """{
    "name": "Pipeline1",
    "properties": {
        "parameters": {
            "a": {"type": "string", "default": ""},
            "b": {"type": "string", "default": "chris"},
            "c": {"type": "string", "default": "columbus"},
            "d": {"type": "integer", "default": "0"}
        },
        "annotations": ["Test","Sample"]
    }
}"""

sample2 = """{
    "name": "Pipeline2",
    "properties": {
        "parameters": {
            "x": {"type": "string", "default": "X"},
            "y": {"type": "string", "default": "Y"},
        },
        "annotations": ["another sample"]
    }

My first approach to load those data is of course, to read them as json structures:

df = spark.read.json(sc.parallelize([sample1,sample2]))
df.printSchema()
df.show()

but this returns:

root
 |-- _corrupt_record: string (nullable = true)
 |-- name: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- parameters: struct (nullable = true)
 |    |    |-- a: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- b: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- c: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- d: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)

+--------------------+---------+--------------------+
|     _corrupt_record|     name|          properties|
+--------------------+---------+--------------------+
|                null|Pipeline1|{[Test, Sample], ...|
|{
    "name": "Pipel...|Pipeline2|                null|
+--------------------+---------+--------------------+

As you can see, the second sample was not loaded, apparently because the schemas of sample1 and sample2 are different (different names of parameters). I do not know, why Microsoft has decided to make the parameters elements of a struct and not of an array - but I can't change that.

Let me come back to my goal: I would like to create two dataframes out of those samples:
The first dataframe should contain the annotations (with the columns pipeline_name and annotation), the other dataframe should contain the parameters (with the columns pipeline_name, parameter_name, parameter_type and parameter_default).

Does anybody know a simple way, to convert elements of a struct (not array) into rows of a dataframe? First of all, I was thinking about a user defined function which converts the json code one by one and loops over the elements of the "parameters" structure to return them as elements of an array. But I did not find out exactly, how to achieve that. I have tried:

import json
from pyspark.sql.types import *

# create a dataframe with the json data as strings
df = spark.createDataFrame([Row(json=sample1), Row(json=sample2)])

#define desired schema
new_schema = StructType([
   StructField("pipeline", StructType([
     StructField("name", StringType(), True)
    ,StructField("params", ArrayType(StructType([
       StructField("paramname", StringType(), True)
      ,StructField("type", StringType(), True)
      ,StructField("default", StringType(), True)
      ])), None)
    ,StructField("annotations", ArrayType(StringType()), True)
    ]), True)
  ])

def parse_pipeline(source:str):
  dict = json.loads(source)
  name = dict["name"]
  props = dict["properties"]
  paramlist = [ ( key,  value.get('type'), value.get('default')) for key, value in props.get("parameters",{}).items() ]
  annotations = props.get("annotations")
  return {'pipleine': { 'name':name, 'params':paramlist, 'annotations': annotations}}

parse_pipeline_udf = udf(parse_pipeline, new_schema)
df = df.withColumn("data", parse_pipeline_udf(F.col("json")))

But this returns an error message: Failed to convert the JSON string '{"metadata":{},"name":"params","nullable":null,"type":{"containsNull":true,"elementType":{"fields":[{"metadata":{},"name":"paramname","nullable":true,"type":"string"},{"metadata":{},"name":"type","nullable":true,"type":"string"},{"metadata":{},"name":"default","nullable":true,"type":"string"}],"type":"struct"},"type":"array"}}' to a field.

Maybe the error comes from the return value of my udf. But if that's the reason, how should I pass the result. Thank you for any help.

2
  • you want a map, not an array Commented Oct 11, 2021 at 10:04
  • please, check you jsons with a website like this one jsonformatter.curiousconcept.com/# Commented Oct 11, 2021 at 10:29

1 Answer 1

1

First, I fixed you data sample : """ and } missing, an extra ,:

sample1 = """{
    "name": "Pipeline1",
    "properties": {
        "parameters": {
            "a": {"type": "string", "default": ""},
            "b": {"type": "string", "default": "chris"},
            "c": {"type": "string", "default": "columbus"},
            "d": {"type": "integer", "default": "0"}
        },
        "annotations": ["Test","Sample"]
    }
}"""

sample2 = """{
    "name": "Pipeline2",
    "properties": {
        "parameters": {
            "x": {"type": "string", "default": "X"},
            "y": {"type": "string", "default": "Y"}
        },
        "annotations": ["another sample"]
    }
}"""

Just fixing this, you should have the sample2 included when using your basic code. But if you want "array", actually, you need a map type.

new_schema = T.StructType([
    T.StructField("name", T.StringType()),
    T.StructField("properties", T.StructType([
        T.StructField("annotations", T.ArrayType(T.StringType())),
        T.StructField("parameters", T.MapType(T.StringType(), T.StructType([
            T.StructField("default", T.StringType()),
            T.StructField("type", T.StringType()),
        ])))
    ]))
])

df = spark.read.json(sc.parallelize([sample1, sample2]), new_schema)

and the result :

df.show(truncate=False)
+---------+-----------------------------------------------------------------------------------------------------+
|name     |properties                                                                                           |
+---------+-----------------------------------------------------------------------------------------------------+
|Pipeline1|[[Test, Sample], [a -> [, string], b -> [chris, string], c -> [columbus, string], d -> [0, integer]]]|
|Pipeline2|[[another sample], [x -> [X, string], y -> [Y, string]]]                                             |
+---------+-----------------------------------------------------------------------------------------------------+

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- parameters: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
Sign up to request clarification or add additional context in comments.

1 Comment

Sorry for the errors in the sample, and the inconvenience to fix them. And thank you for your solution. It is just what I was looking for.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.