Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add base OpenLineage provider implementation #29940

Merged
merged 1 commit into from
Apr 14, 2023
Merged

Conversation

mobuchowski
Copy link
Contributor

@mobuchowski mobuchowski commented Mar 6, 2023

This PR consistent mostly of code that was created in OpenLineage project. It consists of

  • Provider wiring
  • OpenLineageListener that uses Listener API to get notification about changes to TaskInstance and Dag states
  • Extractor framework, which is used to extract lineage information from particular operators. It's ment to be replaced by direct implementation of lineage features in later phase and extracting them using DefaultExtractor. This PR does not include actual extractors, but code around using and registering them.
  • OpenLineageAdapter that translates extracted information to OpenLineage events.
  • Utils around specific Airflow OL facets and features

This is a base implementation that's not ment to be released yet, but to add code modified to be consistent with Airflow standards, get early feedback and provide canvas to add later features, docs, tests on.

Closes: #29669

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Will review in more detail when I can)

airflow/providers/openlineage/extractors/base.py Outdated Show resolved Hide resolved
airflow/providers/openlineage/extractors/base.py Outdated Show resolved Hide resolved
@mobuchowski mobuchowski force-pushed the aip-53-pr branch 3 times, most recently from 879ec51 to 412fbea Compare March 8, 2023 23:09
@JDarDagran JDarDagran force-pushed the aip-53-pr branch 2 times, most recently from c4dd0c4 to f5a709d Compare March 9, 2023 19:38
- 1.0.0

dependencies:
- apache-airflow>=2.5.1
Copy link
Contributor

@JDarDagran JDarDagran Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be set as the most recent published Airflow version. Otherwise, it breaks on Airflow 2.3 installation and provider verification step. Ideally, we'd want to have it set to the first version released with any core changes needed to make OpenLineage provider working (not sure if it'll be 2.5.2 or 2.6.0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something we should focus on now. I will not release the provider before the AIP is complete so we can update this one later.
OL support is considered a feature thus core changes related to it will be in 2.6 or 2.7

@JDarDagran
Copy link
Contributor

Also, Build docs step fails when trying to fetch inventory for OpenLineage provider. I'm not sure if that's something that should be uploaded manually at first?

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure the split between ExtractorManager and Extractors -- they both seem like they are used together. Maybe they should both live in ol.extractors.manager file.

Why plugins in the airflow.providers.openlineage.plugins package name? How pluggable are they?

I'm not all that sold on the amount of code in the utils __init__.py file (both code in init file along side other files, and the amount of code in general. At the very least lets not have code in there when there is also a converters.py along side it)

Haven't reviewed the tests yet. I'll get to them in a day or two

airflow/providers/openlineage/extractors/base.py Outdated Show resolved Hide resolved
Comment on lines 84 to 86
Return the connection URI for the given ID. We first attempt to lookup
the connection URI via AIRFLOW_CONN_<conn_id>, else fallback on querying
the Airflow's connection table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly: This doc string doesn't really match the implementation.

Secondly: How does this differ from just conn.get_uri() It's not clear from reading the function alone. Please add some comments saying why that isn't suitable (or is it?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to redacted_connection_uri and added appropriate comment.

This method uses conn.get_uri() to get connection URI, then removes potential username, password, and known sensitive query parts. I've moved passing that second part to being provided by extractor/operator that calls it.

# Extractors should implement BaseExtractor
from airflow.providers.openlineage.utils import import_from_string

env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly this should now be conf.get("openlineage", "extractors") or similar now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have a series of issues opened linked to AIP-53, one of them is #29670. Along with that we'd like to integration OpenLineage configuration into Airflow config.
Besides that, it's also a matter of backwards compatibility for openlineage-airflow (current OL-Airflow integration outside of Airflow provider) in which you could put custom extractors in this env var.


env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS")
if env_extractors is not None:
for extractor in env_extractors.split(";"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comma more often used for this purpose in Airflow

Copy link
Contributor

@JDarDagran JDarDagran Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

; was supported in openlineage-airflow. I think that for a while we'd want to maintain support of custom extractors as it was. Eventually, we could split on ; or ,.

airflow/providers/openlineage/utils/__init__.py Outdated Show resolved Hide resolved
airflow/providers/openlineage/utils/__init__.py Outdated Show resolved Hide resolved
airflow/providers/openlineage/utils/__init__.py Outdated Show resolved Hide resolved
airflow/providers/openlineage/utils/__init__.py Outdated Show resolved Hide resolved
airflow/providers/openlineage/utils/converters.py Outdated Show resolved Hide resolved
@mobuchowski mobuchowski force-pushed the aip-53-pr branch 7 times, most recently from 8d02baa to 04a02b0 Compare March 14, 2023 11:48
@bolkedebruin
Copy link
Contributor

Can I say that this integration needs extensive docs and architecture description (inside those docs)? It is quite opaque to me know how this works, why it needs workers at all, what it does to my running system to have those workers, how does it affect task runs, what if the code fails did my task fail? etc etc.

@potiuk
Copy link
Member

potiuk commented Mar 17, 2023

Can I say that this integration needs extensive docs and architecture description (inside those docs)? It is quite opaque to me know how this works, why it needs workers at all, what it does to my running system to have those workers, how does it affect task runs, what if the code fails did my task fail? etc etc.

Yeah. I think there are quite a number of things here to make decisions on. I am also going to have a closer look and make a more deep review after the presentation next week, I hope Iy will have much more context after seeing some of the decisions and reasoning for the open-lineage architecture there. I recall how useful it was to get a walkthrough by @ashb of the new scheduler architecture and decisions back in the 2.0 days (and then seeing the "Deep dive" talk from the summit) - this allowed those who participated/watched (and paid attention :) ) to better reason in case of future issues/questions and be able to fix problem or diagnose issues or to propose improvements (even though some of the details there are a bit arcane). I wish for example we had something like that for the Celery Executor or K8S integration when it comes to stalling, adoption, log streaming etc..

That's also note for @mobuchowski and @julienledem. The more of those decisions and contex will be explained, documented (including recording and publishing the meeting is a good idea - ideally followed up by a talk on the Summit) the more it will be a community effort.

For example I am planning to submit a talk for the summit with a working title "Everything you even did not know you wanted to ask about the Airflow CI (or was terrified to ask)" to address at least part of the SPOF problem we have there and pave the way to get others at least being able to reason on where to fix when there are problems.

@potiuk
Copy link
Member

potiuk commented Mar 17, 2023

BTW. @bolkedebruin I think the docs is not nearly enough (and sometimes too much docs is overwhelming). The CI of ours is - I think - documented to the maximum extent possible - including state diagrams, context why we are doing what we are doing, architecture decision records describing the decisions, reference of parameters etc. etc. Still I am the SPOF there big time.

@mobuchowski mobuchowski force-pushed the aip-53-pr branch 2 times, most recently from eca360d to 8cf2a67 Compare April 6, 2023 09:19
@ashb
Copy link
Member

ashb commented Apr 6, 2023

Lets see if we can sort out the config masking/redacting - possibly by subclassing in someway, something like this:

class OLReadactor(SecretsMasker):
    @classmethod
    def from_masker(cls, other: SecretsMasker):
        instance = cls()
        instance.patterns = other.patterns
        instance.replacer = other.replacer

    def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted:
        # custom type checks here
        return super()._redact(item, name, depth, max_depth)

    def _redact_all(self, item: Redactable, depth: int, max_depth: int) -> Redacted:
        # custom type checks here
        return super()._redact_all(item, depth, max_depth)

(Not sure if you'd need both redact and redact_all or if just redact would be enough)

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good first start -- not ready to release as a provider yet, but lets get this merged and then fix up all the identified issues (which are already created in the project)

docs/spelling_wordlist.txt Outdated Show resolved Hide resolved
Comment on lines 58 to 60
Particularly, in Airflow 2 some operators are deprecated and simply subclass the new
implementation, for example airflow.contrib.operators.bigquery_operator.BigQueryOperator.
The BigQueryExtractor needs to work with both of them.
Copy link
Contributor

@eladkal eladkal Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain this.
There is no BigQueryOperator in the code base.

Deprecated operators and new versions of operators are backward compatible. They should not have special treatment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no BigQueryOperator in the code base.

I see, it was removed in September. Removed reference to it, just explaining that single extractor can work for multiple operators.

@mobuchowski mobuchowski force-pushed the aip-53-pr branch 9 times, most recently from a79868c to b1b029b Compare April 13, 2023 13:34
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
Co-authored-by: Jakub Dardzinski <kuba0221@gmail.com>

Fix provider yaml.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Add OpenLineage to extra packages reference.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Change Airflow dependency to 2.5.1+.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
@mobuchowski mobuchowski requested a review from kaxil as a code owner April 14, 2023 08:41
@ashb ashb merged commit 55963de into apache:main Apr 14, 2023
wookiist pushed a commit to wookiist/airflow that referenced this pull request Apr 19, 2023
This PR consistent mostly of code that was created in OpenLineage project. It 
consists of

- Provider wiring
- OpenLineageListener that uses Listener API to get notification about changes
  to TaskInstance and Dag states
- Extractor framework, which is used to extract lineage information from
  particular operators. It's ment to be replaced by direct implementation of
  lineage features in later phase and extracting them using DefaultExtractor.
  This PR does not include actual extractors, but code around using and registering them.
- OpenLineageAdapter that translates extracted information to OpenLineage events.
- Utils around specific Airflow OL facets and features

This is a base implementation that's not ment to be released yet, but to add
code modified to be consistent with Airflow standards, get early feedback and
provide canvas to add later features, docs, tests on.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
@ephraimbuddy ephraimbuddy added this to the Airflow 2.7.0 milestone May 8, 2023
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label May 8, 2023
@JDarDagran JDarDagran deleted the aip-53-pr branch July 28, 2023 16:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools area:plugins area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) kind:documentation provider:openlineage AIP-53
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create OpenLineage provider and migrate existing openlineage-airflow code to it
8 participants