Question:
Consider an AWS Glue job snippet:
1 2 3 4 5 6 7 8 9 |
val input = glueContext .getCatalogSource(database = "my_db", tableName = "my_table") .getDynamicFrame() val myLimit = 10 if (input.count() <= myLimit) { // end glue job here with error } // continue execution |
How do I exit the job with an error status? If I just skip execution, it simply ends as successful; if I throw an exception, it fails with an exception. Can I just invoke something to stop the job with a fail/error status but without throwing an exception?
UPDATED
At first glance I can:
1 2 3 4 5 |
val spark: SparkContext = SparkContext.getOrCreate() val glueContext: GlueContext = new GlueContext(spark) val jobId = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_ID").toArray)("JOB_ID") spark.cancelJob(jobId) |
But:
SparkContext
is from the inner framework and ending the job can lead to unpredictable (unstable) results.org.apache.spark.SparkContext#cancelJob
receivesInt
while AWS Glue has aString
JOB_ID
like this:j_aaa11111a1a11a111a1aaa11a11111aaa11a111a1111111a111a1a1aa111111a
. So it cannot be passed tocancelJob
directly.
Answer:
This is written as pyspark, as it’s what I know
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
args = getResolvedOptions( sys.argv, ["TempDir", "JOB_NAME"] ) job = Job(glue_context) job.init(args["JOB_NAME"], args) if my_check() == False: # you can use any other exit code and glue will still report failure # because the job is not committed sys.exit(0) do_normal_stuff() job.commit() |
The spark job and glue job are different things, which is why you can’t interchange their IDs.