-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add base OpenLineage provider implementation #29940
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Will review in more detail when I can)
879ec51
to
412fbea
Compare
c4dd0c4
to
f5a709d
Compare
- 1.0.0 | ||
|
||
dependencies: | ||
- apache-airflow>=2.5.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be set as the most recent published Airflow version. Otherwise, it breaks on Airflow 2.3 installation and provider verification step. Ideally, we'd want to have it set to the first version released with any core changes needed to make OpenLineage provider working (not sure if it'll be 2.5.2 or 2.6.0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not something we should focus on now. I will not release the provider before the AIP is complete so we can update this one later.
OL support is considered a feature thus core changes related to it will be in 2.6 or 2.7
Also, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure the split between ExtractorManager
and Extractors
-- they both seem like they are used together. Maybe they should both live in ol.extractors.manager file.
Why plugins
in the airflow.providers.openlineage.plugins
package name? How pluggable are they?
I'm not all that sold on the amount of code in the utils __init__.py
file (both code in init file along side other files, and the amount of code in general. At the very least lets not have code in there when there is also a converters.py
along side it)
Haven't reviewed the tests yet. I'll get to them in a day or two
Return the connection URI for the given ID. We first attempt to lookup | ||
the connection URI via AIRFLOW_CONN_<conn_id>, else fallback on querying | ||
the Airflow's connection table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly: This doc string doesn't really match the implementation.
Secondly: How does this differ from just conn.get_uri()
It's not clear from reading the function alone. Please add some comments saying why that isn't suitable (or is it?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to redacted_connection_uri
and added appropriate comment.
This method uses conn.get_uri()
to get connection URI, then removes potential username, password, and known sensitive
query parts. I've moved passing that second part to being provided by extractor/operator that calls it.
# Extractors should implement BaseExtractor | ||
from airflow.providers.openlineage.utils import import_from_string | ||
|
||
env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly this should now be conf.get("openlineage", "extractors")
or similar now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we have a series of issues opened linked to AIP-53, one of them is #29670. Along with that we'd like to integration OpenLineage configuration into Airflow config.
Besides that, it's also a matter of backwards compatibility for openlineage-airflow
(current OL-Airflow integration outside of Airflow provider) in which you could put custom extractors in this env var.
|
||
env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS") | ||
if env_extractors is not None: | ||
for extractor in env_extractors.split(";"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comma more often used for this purpose in Airflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
;
was supported in openlineage-airflow
. I think that for a while we'd want to maintain support of custom extractors as it was. Eventually, we could split on ;
or ,
.
8d02baa
to
04a02b0
Compare
Can I say that this integration needs extensive docs and architecture description (inside those docs)? It is quite opaque to me know how this works, why it needs workers at all, what it does to my running system to have those workers, how does it affect task runs, what if the code fails did my task fail? etc etc. |
Yeah. I think there are quite a number of things here to make decisions on. I am also going to have a closer look and make a more deep review after the presentation next week, I hope Iy will have much more context after seeing some of the decisions and reasoning for the open-lineage architecture there. I recall how useful it was to get a walkthrough by @ashb of the new scheduler architecture and decisions back in the 2.0 days (and then seeing the "Deep dive" talk from the summit) - this allowed those who participated/watched (and paid attention :) ) to better reason in case of future issues/questions and be able to fix problem or diagnose issues or to propose improvements (even though some of the details there are a bit arcane). I wish for example we had something like that for the Celery Executor or K8S integration when it comes to stalling, adoption, log streaming etc.. That's also note for @mobuchowski and @julienledem. The more of those decisions and contex will be explained, documented (including recording and publishing the meeting is a good idea - ideally followed up by a talk on the Summit) the more it will be a community effort. For example I am planning to submit a talk for the summit with a working title "Everything you even did not know you wanted to ask about the Airflow CI (or was terrified to ask)" to address at least part of the SPOF problem we have there and pave the way to get others at least being able to reason on where to fix when there are problems. |
BTW. @bolkedebruin I think the docs is not nearly enough (and sometimes too much docs is overwhelming). The CI of ours is - I think - documented to the maximum extent possible - including state diagrams, context why we are doing what we are doing, architecture decision records describing the decisions, reference of parameters etc. etc. Still I am the SPOF there big time. |
eca360d
to
8cf2a67
Compare
Lets see if we can sort out the config masking/redacting - possibly by subclassing in someway, something like this: class OLReadactor(SecretsMasker):
@classmethod
def from_masker(cls, other: SecretsMasker):
instance = cls()
instance.patterns = other.patterns
instance.replacer = other.replacer
def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted:
# custom type checks here
return super()._redact(item, name, depth, max_depth)
def _redact_all(self, item: Redactable, depth: int, max_depth: int) -> Redacted:
# custom type checks here
return super()._redact_all(item, depth, max_depth) (Not sure if you'd need both redact and redact_all or if just redact would be enough) |
4e0e2ab
to
80aa0b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good first start -- not ready to release as a provider yet, but lets get this merged and then fix up all the identified issues (which are already created in the project)
Particularly, in Airflow 2 some operators are deprecated and simply subclass the new | ||
implementation, for example airflow.contrib.operators.bigquery_operator.BigQueryOperator. | ||
The BigQueryExtractor needs to work with both of them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain this.
There is no BigQueryOperator
in the code base.
Deprecated operators and new versions of operators are backward compatible. They should not have special treatment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no BigQueryOperator in the code base.
I see, it was removed in September. Removed reference to it, just explaining that single extractor can work for multiple operators.
a79868c
to
b1b029b
Compare
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com> Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> Co-authored-by: Jakub Dardzinski <kuba0221@gmail.com> Fix provider yaml. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> Add OpenLineage to extra packages reference. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> Change Airflow dependency to 2.5.1+. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
This PR consistent mostly of code that was created in OpenLineage project. It consists of - Provider wiring - OpenLineageListener that uses Listener API to get notification about changes to TaskInstance and Dag states - Extractor framework, which is used to extract lineage information from particular operators. It's ment to be replaced by direct implementation of lineage features in later phase and extracting them using DefaultExtractor. This PR does not include actual extractors, but code around using and registering them. - OpenLineageAdapter that translates extracted information to OpenLineage events. - Utils around specific Airflow OL facets and features This is a base implementation that's not ment to be released yet, but to add code modified to be consistent with Airflow standards, get early feedback and provide canvas to add later features, docs, tests on. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
This PR consistent mostly of code that was created in OpenLineage project. It consists of
OpenLineageListener
that uses Listener API to get notification about changes to TaskInstance and Dag statesExtractor
framework, which is used to extract lineage information from particular operators. It's ment to be replaced by direct implementation of lineage features in later phase and extracting them usingDefaultExtractor
. This PR does not include actual extractors, but code around using and registering them.OpenLineageAdapter
that translates extracted information to OpenLineage events.This is a base implementation that's not ment to be released yet, but to add code modified to be consistent with Airflow standards, get early feedback and provide canvas to add later features, docs, tests on.
Closes: #29669