There is a json data source. Here is an example of one row:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
]
}
So when load it to spark DataFrame, the schema is:
root
|-- PrimaryAcctNumber: string (nullable = true)
|-- AdditionalData: array (nullable = true)
| |-- element: struct (containsNull = true)
I want to use Spark to create a new column called LongestAddressOfPrimaryAccount based on colomn AdditionalData (ArrayType[StructType]) using the following logic:
- Iterate AdditionalData
- If
AccountNumberproperty equalsPrimaryAcctNumberof the row, the value ofLongestAddressOfPrimaryAccountwill be the longest string inAddrsarray - If no
AccountNumberproperty equalsPrimaryAcctNumber, the value will be "N/A"
- If
So for the given row above, the expected output is:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
],
"LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}
It is doable to use a UDF or a map function. But that is not the best practice for Spark.
Is it doable to just use Spark functions? Something like:
sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
longest(
get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
)
, lit("N/A")))