-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dataset extra fields set in Task outlets are not accessible #35297
Comments
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Hey. I agree that there should be a way to populate dataset_event extras somehow, but I don't think this is the bug as you're describing it. dataset extras and dataset event extras are two distinct concepts and simply populating event extras from dataset extras wouldn't be a correct solution. What we actually need is extending baseoperator API to let the users specify how they want event extra fields to be populated, for example adding additional parameter called |
Hi @tokoko,
As it seems the airflow/airflow/datasets/manager.py Line 66 in 35a1b7a
Like in the issue mentioned, the problem lies in the omitted airflow/airflow/models/taskinstance.py Lines 2342 to 2346 in 55b015f
A fix might look like this def _register_dataset_changes(self, *, session: Session) -> None:
for obj in self.task.outlets or []:
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
if isinstance(obj, Dataset):
dataset_manager.register_dataset_change(
task_instance=self,
dataset=obj,
session=session,
extra=obj.extra
) |
Respectfully, I believe this issue is invalid. Datasets and dataset events are separate and distinct things, as are their respective If you are quite sure that you want to access However, you should think quite a lot about whether or not this is the best way to do what you are trying to do, including what complications can arise with this, before you embark down that road. It may be the case that simply adding a parameter to your DAG is a better solution. Without more details about your specific use case, it's impossible to give better guidance either way. |
Thank you for the clarifications, @blag I indeed forgot to mention my use case which led me to this issue: flowchart TD
ds[Dataset]
s3[(S3)]
subgraph dag1[DAG A]
task1[Task 1]
end
subgraph dag2[DAG B]
task2[Task 2]
end
task1 -- outlets --> ds
task1 -- writes --> s3
s3 --> task2
ds -.- s3
ds -- triggers --> dag2
After reading the docs about data-aware scheduling I thought it would be the perfect fit to orchestrate tasks around a S3 datalake workflow. The S3 keys are for example organized like so:
In my opinion this would be a nice feature to have for data-aware scheduling, which enables users to pass more specific information about what in a dataset has changed to be able to react in consuming tasks accordingly. I am coming to the conclusion, that this is not possible due to the style Lines 3193 to 3195 in fba682b
This issue might then be more of a feature request than a bug. What is your opinion on this @blag? |
@error418 you may be thinking more along the lines of #34206 It sounds like you would like to dynamically generate or update a dataset and, like me, thought that extras was the way to do this. There are MANY use cases where this can be useful but if you look at #34206 it's really not possible as far as I can tell... today. |
A point of clarification: I would update the title of this to be "Dataset event extra field is not persisted", as that better describes what you want to happen. I did not ever intend for the The only intent that I ever had (and note: I was not the author of the dataset AIP) regarding the Now, the But I am perfectly happy to be corrected on this point or any other assertions I've made here. |
@blag my 2 cents, I think dataset event extras would make much more reliable solution for state persistence for lots of use cases, for example if single logical dataset can be updated from multiple dags. Also if you're using data-aware scheduling, using xcoms would mean that you have to additionally track down the dag from which dataset was last updated, which sort of defeats the whole purpose of datasets imho. @mpgreg If you are really looking for a solution.. today. one (ugly) way to do it would be to call
|
How do we move on from here now? I am ready to update the PR with changes to make this happen. Following questions are imho a good starting point:
I also updated the issue title to be more specific |
For |
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Dataset
extra
property cannot be retrieved in dependent tasks.It seems that the information is not passed through the facades:
airflow/airflow/datasets/manager.py
Lines 47 to 74 in 55b015f
The information contained in the
extra
property gets lost in following call, due to the omittedextra
parameter ofregister_dataset_change
:airflow/airflow/models/taskinstance.py
Lines 2337 to 2346 in 55b015f
What you think should happen instead
Dataset events should not be always an empty dict when retrieving the
Dataset
from the taskstriggering_dataset_events
. Instead, the provided contents of theextra
dict should be returned.How to reproduce
outlets
.extra
property inside a Task of the DAGOperating System
Official Airflow Image
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
Airflow 2.7.1
deployed on k8s, using the Airflow chart
Anything else
This matter seems also to be discussed in #31542
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: