-
Notifications
You must be signed in to change notification settings - Fork 16.4k
feat(task_sdk): add support for inlet_events in Task Context #45960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3cb8fb4 to
6ac30c8
Compare
0c8da84 to
9460573
Compare
a267114 to
f613905
Compare
|
Hey @ashb and @amoghrajesh , could you please take a quick look at this PR? in case I'm doing something wrong regarding task_sdk. The tests are not fixed as of now. I'll work on that if the overall logic looks ok. Thanks! |
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General direction looking good, will take a better look with tests.
|
Thanks @amoghrajesh ! |
b5fe1d2 to
d07c722
Compare
|
it's green after rerun 🙌 |
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. Preemptively approving provided my comments are handled.
cd49204 to
1cf88c4
Compare
|
@amoghrajesh I'll merge it this afternoon. Please let me know if you want to take a look again 🙂 |
…45960) * feat(task_sdk): add support for inlet_events in Task Context * feat(task_sdk): add AssetEventCollectionResponse * refactor(task_sdk): combine asset event uris * refactor(api_fastapi): extract asset_event datamodels from asset * fix(task_sdk): revert unrelated datamodels change * fix(task_sdk/context): add _get_asset_events_from_db for fixing tests * test(task_sdk): add test cases for execution_time context inlet access * test(task_sdk): extend test_handle_requests to include asset event calls * test(execution_api): add tests to asset event apis * fix(execution_api): remove unnecessary redact * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): add missing http exception * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): remove duplicate inlet logic * feat(task_sdk): remove AssetEvent form definitions * test(task_sdk): add test case test_run_with_asset_inlets * docs(newsfragments): add description of how inlet_events access has been changed
…45960) * feat(task_sdk): add support for inlet_events in Task Context * feat(task_sdk): add AssetEventCollectionResponse * refactor(task_sdk): combine asset event uris * refactor(api_fastapi): extract asset_event datamodels from asset * fix(task_sdk): revert unrelated datamodels change * fix(task_sdk/context): add _get_asset_events_from_db for fixing tests * test(task_sdk): add test cases for execution_time context inlet access * test(task_sdk): extend test_handle_requests to include asset event calls * test(execution_api): add tests to asset event apis * fix(execution_api): remove unnecessary redact * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): add missing http exception * feat(task_sdk): extract asset response from asset event response * feat(task_sdk): remove duplicate inlet logic * feat(task_sdk): remove AssetEvent form definitions * test(task_sdk): add test case test_run_with_asset_inlets * docs(newsfragments): add description of how inlet_events access has been changed
Why
We need to add support for inlet-events in task_sdk context as well.
What
InletEventsAccessorsfromairflow/utils/context.pytotask_sdk/src/airflow/sdk/execution_time/context.pySUPERVISOR_COMMSto retrieve asset eventsAssetEventResponse,DagRunAssetReferencedata models/asset-eventsroute with/by-asset-name-uri/by-asset-uri/by-asset-name/by-alias-nameAssetEventOperationsto handle the routingtask_sdk/src/airflow/sdk/execution_time/supervisor.pycloses: #45717
closes: #46852
closes: #46852
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.