2

I am moving data from S3 into Postgres RDS using Aws-Glue script. One column (images) in Postgres db has a jsonb type.

Is it possible to convert string into json format to enable glue script saving into jsonb column type?

This is the script I use in aws-glue

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test_database", table_name = "s3_source", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("title", "string", "title", "string"),  ("images", "string", "images", "string")], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["images", "title", "id"], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "test_database", table_name ="rds_target", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "test_database", table_name = "rds_target", transformation_ctx = "datasink5")
job.commit()
2
  • Is there something wrong with the script? Any error messages? Commented Jan 11, 2021 at 9:39
  • yes, there is an error: An error occurred while calling o145.pyWriteDynamicFrame. ERROR: column "images" is of type jsonb but expression is of type character varying Commented Jan 11, 2021 at 11:06

1 Answer 1

2

managed to do solve it thanks to https://stackoverflow.com/a/65821468/2797747

replaced my old write_dynamic_frame call

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = my_dyn_frame, catalog_connection = "mydb", connection_options = {"dbtable": "mytable", "database": "mydb"}, transformation_ctx = "datasink4")

with:

df = my_dyn_frame.toDF()

url = 'jdbc:postgresql://<path>:5432/<database>'

properties = {'user':'*****',
              'password':'*****',
              'driver': "org.postgresql.Driver",
              'stringtype':"unspecified"}

df.write.jdbc(url, table="mytable", mode="append", properties=properties)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.