Prevent Triggerer from crashing when a trigger event isn't serializable#60152
Prevent Triggerer from crashing when a trigger event isn't serializable#60152dabla merged 31 commits intoapache:mainfrom
Conversation
…mplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger
|
Looks good and seems it would be great to have it in 3.1.6 - but I would love someone with more triggerer expertise reviews it. |
…etry call are correctly asserted and bad events are being sanitized
|
@ash I refactored the code as discussed, first I will just try to send the events as is, if that fails, it will first sanitize the events and then retry to send it. I also adapted the test with asserts and verifications, now that scenario is fully covered. |
Backport failed to create: v3-1-test. View the failure log Run details
You can attempt to backport this manually by running: cherry_picker 0238244 v3-1-testThis should apply the commit to the v3-1-test branch and leave the commit in conflict state marking After you have resolved the conflicts, you can continue the backport process by running: cherry_picker --continueIf you don't have cherry-picker installed, see the installation guide. |
…le (#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> (cherry picked from commit 0238244)
|
Backported to v3-1-test |
|
@dabla how did you backport this? I think it's breaking the CI and I can't find the related PR, I'm surprised this could be merged on
I have opened a revert PR there #60976 to see if the CI is happy. |
…rializable (apache#60152)" This reverts commit b7d1c41.
Salut @pierrejeambrun. Well I did as was described above, I checked out the v3-1-test, cherry-picked the commit from this PR, resolved merged conflicts, and pushed it. Will create branch from v3-1-test and create PR from there, thx for reverting it. |
…le (#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> (cherry picked from commit 0238244)
|
@pierrejeambrun I've create PR #60981 for the backport. Thx to @shahar1 for helping me out. |
…le (#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> (cherry picked from commit 0238244)
…le (#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> (cherry picked from commit 0238244)
…le (#60152) (#60981) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> (cherry picked from commit 0238244)
|
PR Merged into v3-1-test. |
…le (apache#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
…le (apache#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
…le (apache#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
…le (apache#60152) * refactor: If asend in TriggerRunner comms decoder crashes due to NotImplementedError as a trigger event is not serializable, then retry without that event and cancel associated trigger * refactor: Applied some reformatting * refactor: Fixed some mypy issues * refactor: Fixed return type of send_changes method * refactor: Changed imports of trigger events * refactor: Reformatted trigger job runner * refactor: Fixed mocking of comms decoder * refactor: Forgot to add the patched supervisor_builder * refactor: Changed asserts in test_sync_state_to_supervisor * refactor: Refactored how state changes are validated * refactor: Validate events while creating the TriggerStateChanges message * refactor: Refactored try/except in validate_state_changes to keep mypy happy * Update airflow-core/src/airflow/jobs/triggerer_job_runner.py Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com> * refactor: Renamed validate_state_changes method to process_trigger_events * refactor: Only sanitize invalid trigger events if first attempt fails * refactor: Should check if msg.events is not None * refactor: Adapted test_sync_state_to_supervisor so both initial and retry call are correctly asserted and bad events are being sanitized --------- Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
This PR is related to the issue #60120 in which the trigger process crashes if a returned trigger event isn't serializable. This PR will catch the NotImpelmentedError and detect which event caused the crash and cancel the trigger associated to that event that caused the error.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.