Skip to content

AWS GlueJobOperator is not updating job config if job exists #27592

@romibuzi

Description

@romibuzi

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.0.0

Apache Airflow version

2.2.5

Operating System

Linux Ubuntu

Deployment

Virtualenv installation

Deployment details

Airflow deployed on ec2 instance

What happened

GlueJobOperator from airflow-amazon-provider is not updating job configuration (like its arguments or number of workers for example) if the job already exists and if there was a change in the configuration for example:

 def get_or_create_glue_job(self) -> str:
        """
        Creates(or just returns) and returns the Job name
        :return:Name of the Job
        """
        glue_client = self.get_conn()
        try:
            get_job_response = glue_client.get_job(JobName=self.job_name)
            self.log.info("Job Already exist. Returning Name of the job")
            return get_job_response['Job']['Name']

        except glue_client.exceptions.EntityNotFoundException:
            self.log.info("Job doesn't exist. Now creating and running AWS Glue Job")

        ...

Is there a particular reason to not doing it? Or it was just not done during the implementation of the operarot?

What you think should happen instead

No response

How to reproduce

Create a GlueJobOperator with a simple configuration:

from airflow.providers.amazon.aws.operators.glue import GlueJobOperator

submit_glue_job = GlueJobOperator(
    task_id='submit_glue_job',
    job_name='test_glue_job
    job_desc='test glue job',
    script_location='s3://bucket/path/to/the/script/file',
    script_args={},
    s3_bucket='bucket',
    concurrent_run_limit=1,
    retry_limit=0,
    num_of_dpus=5,
    wait_for_completion=False
)

Then update one of the initial configuration like num_of_dpus=10 and validate that the operator is not updating glue job configuration on AWS when it is run again.

Anything else

There is GlueCrawlerOperator which is similar to GlueJobOperator and is doing it:

    def execute(self, context: Context):
        """
        Executes AWS Glue Crawler from Airflow

        :return: the name of the current glue crawler.
        """
        crawler_name = self.config['Name']
        if self.hook.has_crawler(crawler_name):
            self.hook.update_crawler(**self.config)
        else:
            self.hook.create_crawler(**self.config)
        ...

This behavior could be reproduced in the AWSGlueJobOperator if we agree to do it.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions