-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==6.0.0
Apache Airflow version
2.2.5
Operating System
Linux Ubuntu
Deployment
Virtualenv installation
Deployment details
Airflow deployed on ec2 instance
What happened
GlueJobOperator from airflow-amazon-provider is not updating job configuration (like its arguments or number of workers for example) if the job already exists and if there was a change in the configuration for example:
def get_or_create_glue_job(self) -> str:
"""
Creates(or just returns) and returns the Job name
:return:Name of the Job
"""
glue_client = self.get_conn()
try:
get_job_response = glue_client.get_job(JobName=self.job_name)
self.log.info("Job Already exist. Returning Name of the job")
return get_job_response['Job']['Name']
except glue_client.exceptions.EntityNotFoundException:
self.log.info("Job doesn't exist. Now creating and running AWS Glue Job")
...Is there a particular reason to not doing it? Or it was just not done during the implementation of the operarot?
What you think should happen instead
No response
How to reproduce
Create a GlueJobOperator with a simple configuration:
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
submit_glue_job = GlueJobOperator(
task_id='submit_glue_job',
job_name='test_glue_job
job_desc='test glue job',
script_location='s3://bucket/path/to/the/script/file',
script_args={},
s3_bucket='bucket',
concurrent_run_limit=1,
retry_limit=0,
num_of_dpus=5,
wait_for_completion=False
)Then update one of the initial configuration like num_of_dpus=10 and validate that the operator is not updating glue job configuration on AWS when it is run again.
Anything else
There is GlueCrawlerOperator which is similar to GlueJobOperator and is doing it:
def execute(self, context: Context):
"""
Executes AWS Glue Crawler from Airflow
:return: the name of the current glue crawler.
"""
crawler_name = self.config['Name']
if self.hook.has_crawler(crawler_name):
self.hook.update_crawler(**self.config)
else:
self.hook.create_crawler(**self.config)
...This behavior could be reproduced in the AWSGlueJobOperator if we agree to do it.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct