0

Scala noob, using Spark 2.3.0.
I'm creating a DataFrame using a udf that creates a JSON String column:

val result: DataFrame = df.withColumn("decrypted_json", instance.decryptJsonUdf(df("encrypted_data")))

it outputs as follows:

+----------------+---------------------------------------+
| encrypted_data | decrypted_json                        |
+----------------+---------------------------------------+
|eyJleHAiOjE1 ...| {"a":547.65 , "b":"Some Data"}        |
+----------------+---------------------------------------+

The UDF is an external code, that I can't change. I would like to split the decrypted_json column into individual columns so the output DataFrame will be like so:

+----------------+----------------------+
| encrypted_data | a      | b           |
+----------------+--------+-------------+
|eyJleHAiOjE1 ...| 547.65 | "Some Data" |
+----------------+--------+-------------+
2

2 Answers 2

2

Below solution is inspired by one of the solutions given by @Jacek Laskowski:

import org.apache.spark.sql.types._
val JsonSchema = new StructType()
  .add($"a".string)
  .add($"b".string)
val schema = new StructType()
  .add($"encrypted_data".string)
  .add($"decrypted_json".array(JsonSchema))

val schemaAsJson = schema.json

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)

import org.apache.spark.sql.functions._

val rawJsons = Seq("""
  {
    "encrypted_data" : "eyJleHAiOjE1",
    "decrypted_json" : [
      {
        "a" : "547.65",
        "b" : "Some Data"
      }
    ]
  }
""").toDF("rawjson")

val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"decrypted_json")) // <-- explode the array field
  .drop("decrypted_json")  // <-- no longer needed
  .select("encrypted_data", "address.*") // <-- flatten the struct field

output

Please go through Link for the original solution with the explanation.
I hope that helps.

Sign up to request clarification or add additional context in comments.

1 Comment

How to make the header be something like "encrypted_data, json_a, json_b", i.e., adding "json_" prefix to the fields of json?
0

Using from_jason you can give parse the JSON into a Struct type then select columns from that dataframe. You will need to know the schema of the json. Here is how -

    val sparkSession = //create spark session
    import sparkSession.implicits._

    val jsonData = """{"a":547.65 , "b":"Some Data"}"""
    val schema = {StructType(
      List(
        StructField("a", DoubleType, nullable = false),
        StructField("b", StringType, nullable = false)
      ))}

    val df = sparkSession.createDataset(Seq(("dummy data",jsonData))).toDF("string_column","json_column")
    val dfWithParsedJson = df.withColumn("json_data",from_json($"json_column",schema))

    dfWithParsedJson.select($"string_column",$"json_column",$"json_data.a", $"json_data.b").show()

Result

+-------------+------------------------------+------+---------+
|string_column|json_column                   |a     |b        |
+-------------+------------------------------+------+---------+
|dummy data   |{"a":547.65 , "b":"Some Data"}|547.65|Some Data|
+-------------+------------------------------+------+---------+

2 Comments

Thank you for your reply, what exactly should I pass as the schema?
Schema of the json needs to be passed. I think my code has an extra quot. I will fix when I get time.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.