Question:
I am very new to AWS Glue. I am working on a small project and the ask is to read a file from S3 bucket, transpose it and load it in a mysql table. The source data in S3 bucket looks as below
1 2 3 4 5 6 |
+----+----+-------+-----+---+--+--------+ |cost|data|minutes|name |sms|id|category| +----+----+-------+-----+---+--+--------+ | 5 |1000| 200 |prod1|500|p1|service | +----+----+-------+-----+---+--+--------+ |
The target table structure is
Product_id, Parameter, value
I am expecting target table to have following values
p1, cost, 5
P1, data, 1000
I am able to load the target table with ID and Value. But I am not able to populate the parameter column. This column is not present in the input data and I want to populate a string depending on which column value I am populating.
Here is the code I used for cost.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1") ## @type: SelectFields ## @args: [paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2"] ## @return: selectfields2 ## @inputs: [frame = applymapping1] selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2") ## @type: ResolveChoice ## @args: [choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3"] ## @return: resolvechoice3 ## @inputs: [frame = selectfields2] resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3") ## @type: ResolveChoice ## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"] ## @return: resolvechoice4 ## @inputs: [frame = resolvechoice3] resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4") ## @type: DataSink ## @args: [database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5"] ## @return: datasink5 ## @inputs: [frame = resolvechoice4] datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5") job.commit() |
Can somebody help me to add this new column to my data frame so that it can be made available in the table?
Thanks
Answer:
For a smaller datsframe you can do the following
- convert the dynamic frame to spark dataframe
- add a column
- convert back to dynamic frame
step 1
1 2 |
datasource0 = datasource0.toDF() |
step 2
1 2 3 4 5 |
from pyspark.sql.functions import udf getNewValues = udf(lambda val: val+1) # you can do what you need to do here instead of val+1 datasource0 = datasource0.withColumn('New_Col_Name', getNewValues(col('some_existing_col')) |
step 3
1 2 3 |
from awsglue.dynamicframe import DynamicFrame datasource0 = DynamicFrame.fromDF(datasource0, glueContext, "datasource0") |
The issue is when you have a large dataset the operation toDF() is very expensive!