Skip to content

Conversation

@Lee-W
Copy link
Member

@Lee-W Lee-W commented Jan 2, 2024

While running a batch job in deferrable mode, the condition might already be met before we defer the task into the trigger. This PR intends to check the batch status before deferring the task to trigger.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Contributor

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@Lee-W Lee-W force-pushed the check-job-status-before-BatchOperator-execute-in-deferrable-mode branch 3 times, most recently from e8363cb to 72100ef Compare January 2, 2024 15:29
@vincbeck
Copy link
Contributor

vincbeck commented Jan 2, 2024

If the condition is already met and the task is deferred, what happens? My understanding is the task gets executed successfully. So the gain here is just time? Instead of deferring the task for nothing (because the condition is already met), we return it directly. Unless there is a very good reason to do so I am not really in favor of this change because it basically duplicates what the trigger is already doing but in the operator itself.

If we go down that path, I guess this logic can apply to any deferrable operator, and as such, such logic should be copied over to all deferrable operators

@Lee-W
Copy link
Member Author

Lee-W commented Jan 3, 2024

If the condition is already met and the task is deferred, what happens? My understanding is the task gets executed successfully. So the gain here is just time? Instead of deferring the task for nothing (because the condition is already met), we return it directly. Unless there is a very good reason to do so I am not really in favor of this change because it basically duplicates what the trigger is already doing but in the operator itself.

Yes, I think the main gain here is time as we avoid unnecessary serialization and deserialization. Another thing is for consistency. We already have this behavior for some of the operators

elif not self.poke(context):
self.defer(
timeout=timedelta(seconds=self.max_attempts * self.poke_interval),
trigger=EmrStepSensorTrigger(
job_flow_id=self.job_flow_id,
step_id=self.step_id,
waiter_delay=int(self.poke_interval),
waiter_max_attempts=self.max_attempts,
aws_conn_id=self.aws_conn_id,
),
method_name="execute_complete",
)

if self.deferrable:
query_status = self.hook.check_query_status(job_id=self.job_id)
self.check_failure(query_status)
if query_status in EmrContainerHook.SUCCESS_STATES:
return self.job_id
timeout = (
timedelta(seconds=self.max_polling_attempts * self.poll_interval)
if self.max_polling_attempts
else self.execution_timeout
)
self.defer(
timeout=timeout,
trigger=EmrContainerTrigger(
virtual_cluster_id=self.virtual_cluster_id,
job_id=self.job_id,
aws_conn_id=self.aws_conn_id,
waiter_delay=self.poll_interval,
),
method_name="execute_complete",
)

If we go down that path, I guess this logic can apply to any deferrable operator, and as such, such logic should be copied over to all deferrable operators

Yes, I think that's something we should do eventually

@vincbeck
Copy link
Contributor

vincbeck commented Jan 3, 2024

If the condition is already met and the task is deferred, what happens? My understanding is the task gets executed successfully. So the gain here is just time? Instead of deferring the task for nothing (because the condition is already met), we return it directly. Unless there is a very good reason to do so I am not really in favor of this change because it basically duplicates what the trigger is already doing but in the operator itself.

Yes, I think the main gain here is time as we avoid unnecessary serialization and deserialization. Another thing is for consistency. We already have this behavior for some of the operators

elif not self.poke(context):
self.defer(
timeout=timedelta(seconds=self.max_attempts * self.poke_interval),
trigger=EmrStepSensorTrigger(
job_flow_id=self.job_flow_id,
step_id=self.step_id,
waiter_delay=int(self.poke_interval),
waiter_max_attempts=self.max_attempts,
aws_conn_id=self.aws_conn_id,
),
method_name="execute_complete",
)

if self.deferrable:
query_status = self.hook.check_query_status(job_id=self.job_id)
self.check_failure(query_status)
if query_status in EmrContainerHook.SUCCESS_STATES:
return self.job_id
timeout = (
timedelta(seconds=self.max_polling_attempts * self.poll_interval)
if self.max_polling_attempts
else self.execution_timeout
)
self.defer(
timeout=timeout,
trigger=EmrContainerTrigger(
virtual_cluster_id=self.virtual_cluster_id,
job_id=self.job_id,
aws_conn_id=self.aws_conn_id,
waiter_delay=self.poll_interval,
),
method_name="execute_complete",
)

If we go down that path, I guess this logic can apply to any deferrable operator, and as such, such logic should be copied over to all deferrable operators

Yes, I think that's something we should do eventually

I see, I dont necessarily agree but I'll wait others to comment, I might be the only grumpy one here :)

@potiuk
Copy link
Member

potiuk commented Jan 3, 2024

I see, I dont necessarily agree but I'll wait others to comment, I might be the only grumpy one here :)

Just a little grumpy, I think :)

Yes, I think that's something we should do eventually

Yes I agree we might want to do it. Eventually, It's an optimisation and we should continue doing it. We've had similar cases in the past - for example when we optimized EmptyOperator. It's hard to have an enforceable rule here though, so I'd say ad-hoc optimisation like this one is the best approach.

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this is a bit redundant. I like the time savings, but it complicates the code a bit to have result checking/hanlding in multiple places. I think if we move forward with this we should do our best to abstract that stuff into functions that are used in both the defer and non-defer paths.

@Lee-W Lee-W force-pushed the check-job-status-before-BatchOperator-execute-in-deferrable-mode branch 3 times, most recently from 3d4e215 to b0288a1 Compare January 6, 2024 15:08
@Lee-W Lee-W force-pushed the check-job-status-before-BatchOperator-execute-in-deferrable-mode branch from b0288a1 to 308789a Compare January 8, 2024 09:55
@phanikumv
Copy link
Contributor

I also think this is a bit redundant. I like the time savings, but it complicates the code a bit to have result checking/hanlding in multiple places. I think if we move forward with this we should do our best to abstract that stuff into functions that are used in both the defer and non-defer paths.

Merging this for now. @Lee-W can you implement the abstraction part in a separate PR

@phanikumv phanikumv merged commit 88c9596 into apache:main Jan 10, 2024
@phanikumv phanikumv deleted the check-job-status-before-BatchOperator-execute-in-deferrable-mode branch January 10, 2024 12:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants