1

Step 1: Reading entire data into a single column and convert into ArrayType

input_file = "../data/datafile.csv"
initial_df = sqlContext.read.format("csv").csv(input_file)
initial_df.show(n=100, truncate=False)

Output Result:

+------------------------------------------------------------------------+
|_c0                                                                     |
+------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |
+------------------------------------------------------------------------+

Step 2: Converting StringType to ArrayType.

inter_df = initial_df.withColumn("array", F.split(initial_df['_c0'], '\|'))
inter_df.show(n=100, truncate=False)

+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|_c0                                                                     |array                                                                         |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""               |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:""]                |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+

Step 3: Converting ArrayType to Dictionary Type so based on key am going to take the Respective key Values. Here am using UDF for converting ArrayType to MapType. For this conversion, it's taking a huge time. (Currently am running code with 300GB file, for processing its taking 3Hour time ) I want to reduce consuming time. Can anyone help me with this please*

def create_dict(input_string):
    result_list = {}
    for ele in input_string:
        internal_ele = ele.strip()
        internal_ele = internal_ele.split(":")
        internal_ele = [ele.strip() for ele in internal_ele]
        result_list[internal_ele[0]] = internal_ele[1].replace('"', "")
    return result_list

create_dict_udf = F.udf(create_dict, MapType(keyType=StringType(), valueType=StringType()))

inter_df = inter_df.withColumn("dictionary", create_dict_udf(F.col("array")))
inter_df.show(n=100, truncate=False)

+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|_c0                                                                     |array                                                                         |dictionary                                                                 |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""           |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:""]                |[MNB ->  , XYZ -> TableData, ABC -> MobileData, ZXC -> MacData]        |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |[MNB -> value4, XYZ -> value2, ABC -> value1, ZXC -> value3]               |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|[MNB -> valueD, XYZ -> ValueB, ABC -> valueA, POI -> valueE, ZXC -> valueC]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |[MNB -> value14, XYZ -> value12, ABC -> value11, ZXC -> value13]           |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |[XYZ -> value2A, ABC -> value1A, ZXC -> value3A]                           |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+

Without using UDF how can I convert ArrayType to MapType?

1
  • which version of spark you are using Commented Jan 8, 2021 at 6:17

2 Answers 2

3

Step 3 can be done using transform and aggregate functions (for Spark 2.4+).

First, transform the array column created from step 2, each element can be converted from string to map type using the str_to_map function. Then, aggregate the result array to concatenate the map elements using map_concat.

df.withColumn("mapc",
              expr("""aggregate(transform(arr, x -> str_to_map(x)), 
                                map(), 
                                (acc, i) -> map_concat(acc, i)
                               )
                   """)) \
  .select("inp", "mapc") \
  .show(truncate=False)
Sign up to request clarification or add additional context in comments.

Comments

1

You can use the transform Higher Order Function (HOF) to convert the array to map. Try this.

df = spark.sql(""" with t1 (
 select  'ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""' c1 union all
 select 'ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"' c1 union all
 select 'ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"' c1 union all
 select 'ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"' c1 union all
 select 'ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"' 
  )  select   c1  inp    from t1
""")
df.show(50,truncate=False)

+------------------------------------------------------------------------+
|inp                                                                     |
+------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |
+------------------------------------------------------------------------+

df2 = df.withColumn("arr", split(col("inp"), "[|]"))
df2.show(50,truncate=False)


+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|inp                                                                     |arr                                                                           |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:""]                    |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+

df3=df2.withColumn("mapc", expr( """ transform(arr , (x,i)  -> map(split(x,":")[0],split(x,":")[1])  ) """)).select("inp","mapc")

df3.printSchema()

root
 |-- inp: string (nullable = false)
 |-- mapc: array (nullable = false)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

df3.show(50,truncate=False)

+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|inp                                                                     |mapc                                                                                                   |
+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |[[ABC -> "MobileData"], [XYZ -> "TableData"], [ZXC -> "MacData"], [MNB -> ""]]                         |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[[ABC -> "value1"    ], [XYZ -> "value2"   ], [ZXC -> "value3" ], [MNB -> "value4"]]                   |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[[ABC ->  "valueA"   ], [XYZ -> "ValueB"   ], [ZXC -> "valueC" ], [MNB -> "valueD"], [POI -> "valueE"]]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[[ABC -> "value11"    ], [XYZ -> "value12"   ], [ZXC -> "value13" ], [MNB -> "value14"]]               |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[[ABC -> "value1A"    ], [XYZ -> "value2A"   ], [ZXC -> "value3A"]]                                    |
+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+

Update-1:

To retrieve a value from a array of map for a given key value. Ex 'ABC'

df3.createOrReplaceTempView("df3")

spark.sql("""
select mapc, xx, array_min(transform( mapc, (x,i) -> x[xx])) yy from (
select mapc, concat_ws('',filter(flatten(transform( mapc, (x,i) -> map_keys(x))), y -> y='ABC'))  xx
from df3 )
""").show(50,truncate=False)

+-------------------------------------------------------------------------------------------------------+---+-------------+
|mapc                                                                                                   |xx |yy           |
+-------------------------------------------------------------------------------------------------------+---+-------------+
|[[ABC -> "MobileData"], [XYZ -> "TableData"], [ZXC -> "MacData"], [MNB -> ""]]                         |ABC|"MobileData" |
|[[ABC -> "value1"    ], [XYZ -> "value2"   ], [ZXC -> "value3" ], [MNB -> "value4"]]                   |ABC|"value1"     |
|[[ABC ->  "valueA"   ], [XYZ -> "ValueB"   ], [ZXC -> "valueC" ], [MNB -> "valueD"], [POI -> "valueE"]]|ABC| "valueA"    |
|[[ABC -> "value11"    ], [XYZ -> "value12"   ], [ZXC -> "value13" ], [MNB -> "value14"]]               |ABC|"value11"    |
|[[ABC -> "value1A"    ], [XYZ -> "value2A"   ], [ZXC -> "value3A"]]                                    |ABC|"value1A"    |
+-------------------------------------------------------------------------------------------------------+---+-------------+

3 Comments

Thank you for responding on my request. How can i take value based on keys here?? df3.withColumn("RecordValue,F.col("mapc").get("XYZ")) //Its failing . Any you please suggest me. Thanks in Advance
Is it possible to create a column with dictionary Type like Step 3 in my question. in my existing code am trying to retrieving the value based on keys. Can you please help .
@MyMacMyLife.. not sure I understand correctly. can you check my update-1

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.