1

I have a simple program that has Dataset with column resource_serialized having JSON string as value as below:

import org.apache.spark.SparkConf

object TestApp {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("Loading Data").setMaster("local[*]")

    val spark = org.apache.spark.sql.SparkSession
      .builder
      .config(sparkConf)
      .appName("Test")
      .getOrCreate()

    val json = "[{\"resource_serialized\":\"{\\\"createdOn\\\":\\\"2000-07-20 00:00:00.0\\\",\\\"genderCode\\\":\\\"0\\\"}\",\"id\":\"00529e54-0f3d-4c76-9d3\"}]"

    import spark.implicits._
    val df = spark.read.json(Seq(json).toDS)
    df.printSchema()
    df.show()
  }
}

Schema printed is:

root
 |-- id: string (nullable = true)
 |-- resource_serialized: string (nullable = true)

Dataset printed on the console is:

+--------------------+--------------------+
|                  id| resource_serialized|
+--------------------+--------------------+
|00529e54-0f3d-4c7...|{"createdOn":"200...|
+--------------------+--------------------+

the resource_serialized field has json string, which is (from debug console)

enter image description here

Now, I need to create dataset/dataframe out of that json string, how can i achieve this?

My goal is to get Dataset like this:

+--------------------+--------------------+----------+
|                  id|           createdOn|genderCode|
+--------------------+--------------------+----------+
|00529e54-0f3d-4c7...|2000-07-20 00:00    |         0|
+--------------------+--------------------+----------+

2 Answers 2

2

Use from_json function to convert json string to df columns.

Example:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val sch= new StructType().add("createdOn",StringType).add("genderCode",StringType)
df.select(col("id"),from_json(col("resource_serialized"),sch).alias("str")).
select("id","str.*").
show(10,false)

//result
//+----------------------+---------------------+----------+
//|id                    |createdOn            |genderCode|
//+----------------------+---------------------+----------+
//|00529e54-0f3d-4c76-9d3|2000-07-20 00:00:00.0|0         |
//+----------------------+---------------------+----------+

If you have a valid json, we can read json with schema directly in spark.read.json

val json = """[{"resource_serialized":{"createdOn":"2000-07-20 00:00:00.0","genderCode":"0"},"id":"00529e54-0f3d-4c76-9d3"}]"""

val sch=new StructType().
add("id",StringType).
add("resource_serialized", new StructType().add("createdOn",StringType).
add("genderCode",StringType))

spark.read.option("multiline","true").
schema(sch).
json(Seq(json).toDS).
select("id","resource_serialized.*").
show()
//+--------------------+--------------------+----------+
//|                  id|           createdOn|genderCode|
//+--------------------+--------------------+----------+
//|00529e54-0f3d-4c7...|2000-07-20 00:00:...|         0|
//+--------------------+--------------------+----------+
Sign up to request clarification or add additional context in comments.

1 Comment

this is cool! thanks! is there a way to map it if i don't have StructType?
1

Below solution will allow you to map all the key values of resource_serialized to (String,String) table which later on can be parse mapped.

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

object TestApp {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("Loading Data").setMaster("local[*]")

    val spark = org.apache.spark.sql.SparkSession
      .builder
      .config(sparkConf)
      .appName("Test")
      .getOrCreate()

    val json = "[{\"resource_serialized\":\"{\\\"createdOn\\\":\\\"2000-07-20 00:00:00.0\\\",\\\"genderCode\\\":\\\"0\\\"}\",\"id\":\"00529e54-0f3d-4c76-9d3\"}]"

    import spark.implicits._
    val df = spark.read.json(Seq(json).toDS)
    val jsonColumn = from_json($"resource_serialized", MapType(StringType, StringType))
    val keysDF = df.select(explode(map_keys(jsonColumn))).distinct()
    val keys = keysDF.collect().map(f=>f.get(0))
    val keyCols = keys.map(f=> jsonColumn.getItem(f).as(f.toString))
    df.select( $"id" +: keyCols:_*).show(false)

  }
}


the output would look like

+----------------------+---------------------+----------+
|id                    |createdOn            |genderCode|
+----------------------+---------------------+----------+
|00529e54-0f3d-4c76-9d3|2000-07-20 00:00:00.0|0         |
+----------------------+---------------------+----------+

4 Comments

can we map it into columr form without defining StructType like Shu has mentioned in his answer?
yes check just add my line in your code and it will work. Now I have added the complete code
i have updated my question with expected table, would you mind checking that once?
update the solution @YogenRai if it answers please upvote + accept it

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.