Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset extra fields set in Task outlets are not accessible #35297

Open
2 tasks done
error418 opened this issue Oct 31, 2023 · 10 comments
Open
2 tasks done

Dataset extra fields set in Task outlets are not accessible #35297

error418 opened this issue Oct 31, 2023 · 10 comments
Labels
area:core area:datasets Issues related to the datasets feature kind:bug This is a clearly a bug

Comments

@error418
Copy link
Contributor

error418 commented Oct 31, 2023

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Dataset extra property cannot be retrieved in dependent tasks.

It seems that the information is not passed through the facades:

def register_dataset_change(
self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs
) -> None:
"""
Register dataset related changes.
For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
the dataset event
"""
dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
source_task_id=task_instance.task_id,
source_dag_id=task_instance.dag_id,
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
extra=extra,
)
)
session.flush()
Stats.incr("dataset.updates")
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()

The information contained in the extra property gets lost in following call, due to the omitted extra parameter of register_dataset_change:

def _register_dataset_changes(self, *, session: Session) -> None:
for obj in self.task.outlets or []:
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
if isinstance(obj, Dataset):
dataset_manager.register_dataset_change(
task_instance=self,
dataset=obj,
session=session,
)

What you think should happen instead

Dataset events should not be always an empty dict when retrieving the Dataset from the tasks triggering_dataset_events. Instead, the provided contents of the extra dict should be returned.

How to reproduce

  • Create a DAG with a Task providing a Dataset in the outlets.
  task.outlets.append(Dataset("dataset_uri", extra=dict(test="1", another="2")))
  • Create a data-aware DAG for this Dataset
  • Try to retrieve the contents of the Dataset extra property inside a Task of the DAG
for ds_uri, ds_events in triggering_dataset_events.items():
  LOG.info("%s:", ds_uri)
  for ds_event in ds_events:
    LOG.info("  %s -- %s", ds_event.dataset, ds_event.extra)

Operating System

Official Airflow Image

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Airflow 2.7.1
deployed on k8s, using the Airflow chart

Anything else

This matter seems also to be discussed in #31542

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@error418 error418 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Oct 31, 2023
Copy link

boring-cyborg bot commented Oct 31, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@tokoko
Copy link
Contributor

tokoko commented Dec 4, 2023

Hey. I agree that there should be a way to populate dataset_event extras somehow, but I don't think this is the bug as you're describing it. dataset extras and dataset event extras are two distinct concepts and simply populating event extras from dataset extras wouldn't be a correct solution.

What we actually need is extending baseoperator API to let the users specify how they want event extra fields to be populated, for example adding additional parameter called outlet_extras or modifying existing outlets parameter to optionally accept tuples of [dataset: Dataset, extra: str].

@error418
Copy link
Contributor Author

error418 commented Dec 4, 2023

Hi @tokoko,

register_dataset_change seems never to be called with its extra parameter. It would be fine, if we would be able to pull and store extra data from/to the event.

As it seems the Dataset-Entity primary key is uri, which does not allow multiple Datasets with different extra configurations.

dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

DatasetEvent would be therefore the only available place to store Dataset extra data emitted from tasks.

Like in the issue mentioned, the problem lies in the omitted extra parameter in taskinstance.py, which is responsible for populating the extra property of DatasetEvent

dataset_manager.register_dataset_change(
task_instance=self,
dataset=obj,
session=session,
)

A fix might look like this

    def _register_dataset_changes(self, *, session: Session) -> None:
        for obj in self.task.outlets or []:
            self.log.debug("outlet obj %s", obj)
            # Lineage can have other types of objects besides datasets
            if isinstance(obj, Dataset):
                dataset_manager.register_dataset_change(
                    task_instance=self,
                    dataset=obj,
                    session=session,
                    extra=obj.extra
                )

@blag
Copy link
Contributor

blag commented Dec 6, 2023

Respectfully, I believe this issue is invalid. Datasets and dataset events are separate and distinct things, as are their respective extra fields, and copying DatasetModel.extra into each and every DatasetEvent.extra field breaks both the original intent of both and database normalization.

If you are quite sure that you want to access DatasetModel.extra within a task, you should be able to query the Airflow database directly from within the task, using the keys available in triggering_dataset_events.

However, you should think quite a lot about whether or not this is the best way to do what you are trying to do, including what complications can arise with this, before you embark down that road. It may be the case that simply adding a parameter to your DAG is a better solution. Without more details about your specific use case, it's impossible to give better guidance either way.

@error418
Copy link
Contributor Author

error418 commented Dec 6, 2023

Thank you for the clarifications, @blag

I indeed forgot to mention my use case which led me to this issue:

flowchart TD

  ds[Dataset]
  s3[(S3)]

  subgraph dag1[DAG A]
    task1[Task 1]
  end

  subgraph dag2[DAG B]
    task2[Task 2]
  end

  task1 -- outlets --> ds
  task1 -- writes --> s3
  s3 --> task2

  ds -.- s3
  ds -- triggers --> dag2
Loading

After reading the docs about data-aware scheduling I thought it would be the perfect fit to orchestrate tasks around a S3 datalake workflow.

The S3 keys are for example organized like so:

/path/to/dataset/2020-01-01.avro
/path/to/dataset/2020-01-02.avro
/path/to/dataset/2020-01-03.avro
...

Task 1 processes information, puts a result file to the S3 bucket using the path schema above and emits a Dataset on its outlets.
The Airflow Dataset would have the uri s3://path/to/dataset with an extra key s3_key pointing to the newly put file.

DAG B listens for Datasets with the uri s3://path/to/dataset, triggers, and runs Task 2, which reads the s3 key from extra and retrieves the file using a S3 Hook.

In my opinion this would be a nice feature to have for data-aware scheduling, which enables users to pass more specific information about what in a dataset has changed to be able to react in consuming tasks accordingly.

I am coming to the conclusion, that this is not possible due to the style Datasets are handled. A Dataset is identified using solely its uri, which does not allow changing the initial value of extra.

airflow/airflow/models/dag.py

Lines 3193 to 3195 in fba682b

stored_dataset = session.scalar(
select(DatasetModel).where(DatasetModel.uri == dataset.uri).limit(1)
)

This issue might then be more of a feature request than a bug. What is your opinion on this @blag?

@mpgreg
Copy link

mpgreg commented Dec 6, 2023

@error418 you may be thinking more along the lines of #34206

It sounds like you would like to dynamically generate or update a dataset and, like me, thought that extras was the way to do this. There are MANY use cases where this can be useful but if you look at #34206 it's really not possible as far as I can tell... today.

@blag
Copy link
Contributor

blag commented Dec 6, 2023

A point of clarification: DatasetModel.extra is different and distinct from DatasetEvent.extra. The two fields fill similar but distinct roles. As I noted in this comment, the documentation needs to be reworded better to improve clarity on this subject.

I would update the title of this to be "Dataset event extra field is not persisted", as that better describes what you want to happen.

I did not ever intend for the extra field to be writable - at all - by Airflow tasks. I consider XComs to be the only way to pass information between task instances in the same DAG run, and (less gracefully) task instances between DAG runs, and I think that is a pretty common sentiment.

The only intent that I ever had (and note: I was not the author of the dataset AIP) regarding the extra fields for datasets and dataset events, was to allow third party integrations to easily store information from external systems that wasn't captured in Airflow's database schema, eg: to do so without forking the schema migrations. Those third party integrations were originally called DatasetEventManager, and were renamed DatasetManager before the first Airflow release that included datasets.

Now, the DatasetEvent.extra field should be readable by tasks. If it is not then that is a bug (and #36075 will not fix that). But if all you are looking to do is pass information between task instances in the same DAG run or between task instances in different DAG runs, I believe the Airflow mechanism to do this is XComs, even with data-aware Airflow.

But I am perfectly happy to be corrected on this point or any other assertions I've made here.

@tokoko
Copy link
Contributor

tokoko commented Dec 6, 2023

@blag my 2 cents, I think dataset event extras would make much more reliable solution for state persistence for lots of use cases, for example if single logical dataset can be updated from multiple dags. Also if you're using data-aware scheduling, using xcoms would mean that you have to additionally track down the dag from which dataset was last updated, which sort of defeats the whole purpose of datasets imho.

@mpgreg If you are really looking for a solution.. today. one (ugly) way to do it would be to call setattr(context['ti'].task, 'outlet_extra', payload) from somewhere in the execute or maybe post_execute methods. And then you have to define your own implementation for DatasetManager (and set AIRFLOW__CORE__DATASET_MANAGER_CLASS) something like this:

class ExtraDatasetManager(DatasetManager):
    def register_dataset_change(
        self, *, task_instance, dataset: Dataset, extra=None, session: Session, **kwargs
    ) -> None:
        if not extra and hasattr(task_instance.task, 'outlet_extra'):
            extra = getattr(task_instance.task, 'outlet_extra')
        super().register_dataset_change(task_instance=task_instance, dataset=dataset,extra=extra, session=session, **kwargs)

@error418 error418 changed the title Dataset extra fields are not persisted Dataset extra fields set in Task outlets are not accessible Dec 8, 2023
@error418
Copy link
Contributor Author

error418 commented Dec 8, 2023

How do we move on from here now? I am ready to update the PR with changes to make this happen.

Following questions are imho a good starting point:

  • What purpose has the Dataset extra property, as it retains the values of the initial Dataset of an uri and can never be changed at the moment (the core problem of this issue)
  • How do we enable Task authors to easily set and read Dataset / DatasetEvent extra properties
  • How should the task code look like to set and read extras

I also updated the issue title to be more specific

@blag
Copy link
Contributor

blag commented Jan 5, 2024

Dataset.extra can be useful within organizations to record additional structured information about a data, including but not limited to: who created it, why they created it, which internal team owns the dataset, and data access labels (eg: different datasets might be GDPR/CCPA/etc. compliant and others might not). Within Airflow proper, I would leave it read-only, but maybe make it easier for tasks to read it.

For DatasetEvent.extra, I would discuss it with the larger Airflow community, laying out the reasons why XComs and DatasetEvent.extra serve separate and distinct use cases, and what new functionality would be enabled by exposing them as easily writable from within tasks. Since the Airflow community has largely circled the wagons around using XComs for all (?) inter-task communication, you will need to demonstrate and document how they are not and could not be appropriately used, extended, or abused to fit your use case, and that your use case is common enough to include in mainstream Airflow. And XComs are very flexible, as they can be (ab)used quite a bit to fit a pretty large range of use cases.

@cmarteepants cmarteepants added the area:datasets Issues related to the datasets feature label Feb 6, 2024
@nathadfield nathadfield removed the needs-triage label for new issues that we didn't triage yet label Mar 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:datasets Issues related to the datasets feature kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants