1

I have a json file which looks like this

{
   "tags":[
      "Real_send",
      "stopped"
   ],
   "messages":{
      "7c2e9284-993d-4eb4-ad6b-6a2bfcc51060":{
         "channel":"channel 1",
         "name":"Version 1",
         "alert":"\ud83d\ude84 alert 1"
      },
      "c2cbd05c-5452-476c-bdc7-ac31ed3417f9":{
         "channel":"channel 1",
         "name":"name 1",
         "type":"type 1"
      },
      "b869886f-0f9c-487f-8a43-abe3d6456678":{
         "channel":"channel 2",
         "name":"Version 2",
         "alert":"\ud83d\ude84 alert 2"
      }
   }
}

I want the output to look like below

Sample output

When I print the schema I get the below schema from spark

StructType(List(
StructField(messages,
StructType(List(
   StructField(7c2e9284-993d-4eb4-ad6b-6a2bfcc51060,
   StructType(List(
      StructField(alert,StringType,true),
      StructField(channel,StringType,true),
      StructField(name,StringType,true))),true),
   StructField(b869886f-0f9c-487f-8a43-abe3d6456678,StructType(List(
      StructField(alert,StringType,true),
      StructField(channel,StringType,true),
      StructField(name,StringType,true))),true),
   StructField(c2cbd05c-5452-476c-bdc7-ac31ed3417f9,StructType(List(
      StructField(channel,StringType,true),
      StructField(name,StringType,true),
      StructField(type,StringType,true))),true))),true),
StructField(tags,ArrayType(StringType,true),true)))

Basically 7c2e9284-993d-4eb4-ad6b-6a2bfcc51060 should be considered as my ID column

My code looks like:

cols_list_to_select_from_flattened = ['alert', 'channel', 'type', 'name']    
df = df \
        .select(
                F.json_tuple(
                    F.col('messages'), *cols_list_to_select_from_flattened
                )
                .alias(*cols_list_to_select_from_flattened))

df.show(1, False)

Error message:

E               pyspark.sql.utils.AnalysisException: cannot resolve 'json_tuple(`messages`, 'alert', 'channel', 'type', 'name')' due to data type mismatch: json_tuple requires that all arguments are strings;
E               'Project [json_tuple(messages#0, alert, channel, type, name) AS ArrayBuffer(alert, channel, type, name)]
E               +- Relation[messages#0,tags#1] json

I also tried to list all keys like below

df.withColumn("map_json_column", F.posexplode_outer(F.col("messages"))).show()

But got error

  E               pyspark.sql.utils.AnalysisException: cannot resolve 'posexplode(`messages`)' due to data type mismatch: input to function explode should be array or map type, not struct<7c2e9284-993d-4eb4-ad6b-6a2bfcc51060:struct<alert:string,channel:string,name:string>,b869886f-0f9c-487f-8a43-abe3d6456678:struct<alert:string,channel:string,name:string>,c2cbd05c-5452-476c-bdc7-ac31ed3417f9:struct<channel:string,name:string,type:string>>;
E               'Project [messages#0, tags#1, generatorouter(posexplode(messages#0)) AS map_json_column#5]
E               +- Relation[messages#0,tags#1] json

How can I get the desired output?

1 Answer 1

1

When reading json you can specify your own schema, instead of message column being a struct type make it a map type and then you can simply explode that column

Here is a self contained example with your data

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

json_sample = """
{
   "tags":[
      "Real_send",
      "stopped"
   ],
   "messages":{
      "7c2e9284-993d-4eb4-ad6b-6a2bfcc51060":{
         "channel":"channel 1",
         "name":"Version 1",
         "alert":"lert 1"
      },
      "c2cbd05c-5452-476c-bdc7-ac31ed3417f9":{
         "channel":"channel 1",
         "name":"name 1",
         "type":"type 1"
      },
      "b869886f-0f9c-487f-8a43-abe3d6456678":{
         "channel":"channel 2",
         "name":"Version 2",
         "alert":" alert 2"
      }
   }
}
"""

data = spark.sparkContext.parallelize([json_sample])

cols_to_select = ['alert', 'channel', 'type', 'name']

# The schema of message entry, only columns 
# that are needed to select will be parsed, 
# must be nullable based on your data sample
message_schema = StructType([
    StructField(col_name, StringType(), True) for col_name in cols_to_select
])

# the complete document schema
json_schema = StructType([
    StructField("tags", StringType(), False),
    StructField("messages", MapType(StringType(), message_schema, False) ,False),
])

# Read json and parse to specific schema
# Here instead of sample data you can use file path 
df = spark.read.schema(json_schema).json(data)

# explode the map column and select the requires columns
df = (
    df
        .select(F.explode(F.col("messages")))
        .select(
            F.col("key").alias("id"),
            *[F.col(f"value.{col_name}").alias(col_name) for col_name in cols_to_select]
        )
)

df.show(truncate=False)
Sign up to request clarification or add additional context in comments.

1 Comment

Wow, this is insane. Saved my day

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.