0

I am getting data from a blob location into a dataframe as below.

| NUM_ID|                                                                                                                              Event|
+-------+-----------------------------------------------------------------------------------------------------------------------------------+
|XXXXX01|[{"SN":"SIG1","E":1571599398000,"V":19.79},{"SN":"SIG1","E":1571599406000,"V":19.80},{"SN":"SIG2","E":1571599406000,"V":25.30},{...|
|XXXXX02|[{"SN":"SIG1","E":1571599414000,"V":19.79},{"SN":"SIG2","E":1571599414000,"V":19.80},{"SN":"SIG2","E":1571599424000,"V":25.30},{...|

If we take a single row it will be as below.

|XXXXX01|[{"SN":"SIG1","E":1571599398000,"V":19.79},{"SN":"SIG1","E":1571599406000,"V":19.80},{"SN":"SIG1","E":1571599414000,"V":19.20},{"SN":"SIG2","E":1571599424000,"V":25.30},{"SN":"SIG2","E":1571599432000,"V":19.10},{"SN":"SIG3","E":1571599440000,"V":19.10},{"SN":"SIG3","E":1571599448000,"V":19.10},{"SN":"SIG3","E":1571599456000,"V":19.10},{"SN":"SIG3","E":1571599396000,"V":19.79},{"SN":"SIG3","E":1571599404000,"V":19.79}]

Event column is having different signals as E,V pair.

The schema for this dataframe is as shown below.

scala> df.printSchema
root
 |-- NUM_ID: string (nullable = true)
 |-- Event: string (nullable = true)

I want to take some signals(suppose i need only SIG1 and SIG3) along with E,V pairs as a new column as shown below.

+-------+--------+--------------+------+
| NUM_ID|   Event|             E|     V|
+-------+--------+--------------+------+
|XXXXX01|    SIG1| 1571599398000| 19.79|
|XXXXX01|    SIG1| 1571599406000| 19.80|
|XXXXX01|    SIG1| 1571599414000| 19.20|
|XXXXX01|    SIG3| 1571599440000| 19.10|
|XXXXX01|    SIG3| 1571599448000| 19.10|
|XXXXX01|    SIG3| 1571599406000| 19.10|
|XXXXX01|    SIG3| 1571599396000| 19.70|
|XXXXX01|    SIG3| 1571599404000| 19.70|
+-------+--------+--------------+------+

and the final output should be as like below for each NUM_ID.

+-------+--------------+------+------+
| NUM_ID|             E|SIG1 V|SIG3 V|    
+-------+--------------+------+------+
|XXXXX01| 1571599398000| 19.79|  null|
|XXXXX01| 1571599406000| 19.80| 19.70|
|XXXXX01| 1571599414000| 19.20|  null|
|XXXXX01| 1571599440000|  null| 19.10|
|XXXXX01| 1571599448000|  null| 19.10|
|XXXXX01| 1571599448000|  null| 19.10|
|XXXXX01| 1571599406000| 19.80| 19.10|
|XXXXX01| 1571599396000|  null| 19.70|
|XXXXX01| 1571599404000|  null| 19.70|
+-------+--------------+------+------+

Appreciate any leads. Thanks in Advance!

3
  • What have you tried so far? Commented Oct 21, 2019 at 14:53
  • @Shankar Koirala- I tried to convert the Event column into struct type as below. val schema = ArrayType(StructType(Seq(StructField("SN", StringType), StructField("E", StringType), StructField("V", StringType)))) df.withColumn("sig_array", from_json($"Event", schema)) And obtaining [SN, E, V ] as a new column, now trying to explode that column! Commented Oct 22, 2019 at 4:29
  • You could also use from_json if json has a predefined schema or get_json_object if you dont know the schema as shown here Commented Oct 23, 2019 at 11:17

2 Answers 2

1

Above Event column contains multiple records in a row, that is data has to flatten before processing it further. Data flattening could be achieved by a flatmap transformation operation on DataFrame.

The approach is creating a flatten JSON Dataframe having all the necessary key & values in it, and finally JSON to DataFrame conversion via Spark read json API.

val mapper = new ObjectMapper()
import spark.implicits._

val flatDF = df.flatMap(row => {
  val numId = row.getAs[String]("NUM_ID")
  val event = row.getAs[String]("Event")
  val data = mapper.readValue(event, classOf[Array[java.util.Map[String, String]]])

  data.map(jsonMap => {
    jsonMap.put("NUM_ID", numId)
    mapper.writeValueAsString(jsonMap)
  })
})

val finalDF = spark.read.json(flatDF)

//finalDF Outout
+-------------+-------+----+-----+
|            E| NUM_ID|  SN|    V|
+-------------+-------+----+-----+
|1571599398000|XXXXX01|SIG1|19.79|
|1571599406000|XXXXX01|SIG1| 19.8|
|1571599406000|XXXXX01|SIG2| 25.3|
|1571599414000|XXXXX02|SIG1|19.79|
|1571599414000|XXXXX02|SIG2| 19.8|
|1571599424000|XXXXX02|SIG2| 25.3|
+-------------+-------+----+-----+
Sign up to request clarification or add additional context in comments.

Comments

0

This is obtained by getting json object and exploding the column as below.

val schema = ArrayType(StructType(Seq(StructField("SN", StringType), StructField("E", StringType), StructField("V", StringType))))
val structDF = fromBlobDF.withColumn("sig_array", from_json($"Event", schema))

val signalsDF = structDF.withColumn("sig_array", explode($"sig_array")).withColumn("SIGNAL", $"sig_array.SN").withColumn("E", $"sig_array.E").withColumn("V", $"sig_array.V").select("NUM_ID","E","SIGNAL","V")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.