-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
openlineage: add extractors for python and bash operators #30713
Conversation
96aca6f
to
181cea1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I was under the impression that we can get rid of the extractors and make the open-lineage facet extraction embedded in the operators ?
Hey @potiuk - as I wrote in the implementation doc we want to migrate them, as we already have them it in OL. Then move to gradually reimplementing this in-operator. This would allow us to release |
Ah. OK makes sense. Can we mark all of those classes as :meta private: (not sure if this is the right way to mark the whole modules as private) and add description in those modules/classes so that it is clear those are private parts and not belonging to the API of the provider ? |
Ideally I'd add |
181cea1
to
fc9792a
Compare
yep |
I think we should also (discussed it with @eladkal - add a mechanism - similar to suspended but only for releases) to exclude open-lineage from releasing until it is ready. |
yeah the manual removal of the provider from the release is not so nice but I guess this is really an edge case... it's unlikely we will encounter another case of merged new provider that we don't want to release so I'm not sure if worth the effort setting this option |
Indeed. That's why I just HARDCODED It for now : #30841 |
fc9792a
to
731fff3
Compare
731fff3
to
d14efba
Compare
Just curious, should this provider be AIP-21 compliant? If so, maybe we have file paths of |
@josh-fell generally, those wouldn't be used by users of Airflow - you don't need to manually specify those paths anywhere, unless you want to add your own custom extractor, so I personally don't think it matters. I like shorter paths though and I'll change it. |
d14efba
to
4817629
Compare
Of course. This was more of a philosophical question since this might not be considered a "traditional" provider so should the AIP apply. I'm a fan because it at least removes some schematic redundancy in the import path, but figured I'd pose the question. |
884b628
to
f8df58f
Compare
return ["BashOperator"] | ||
|
||
def extract(self) -> OperatorLineage | None: | ||
collect_source = os.environ.get("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "true").lower() not in ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be from airflow conf now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased, moved to Airflow conf with env fallback.
try_import_from_string("airflow.providers.openlineage.extractors.python.PythonExtractor"), | ||
try_import_from_string("airflow.providers.openlineage.extractors.bash.BashExtractor"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given these are modules from the same provider I don't see how these could fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - moved those to regular imports.
d298c67
to
421b0b2
Compare
dfa1758
to
d736a41
Compare
@potiuk @josh-fell @pierrejeambrun can you merge? |
@classmethod | ||
def get_operator_classnames(cls) -> list[str]: | ||
return ["BashOperator"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is maybe a bigger question, but is there a reason this couldn't be just a property?
@classmethod | |
def get_operator_classnames(cls) -> list[str]: | |
return ["BashOperator"] | |
operator_classnames: list[str] = ["BashOperator"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be, but we'd have to keep get_operator_classnames
on base extractor for backwards compatibility.
unknownItems=[ | ||
UnknownOperatorInstance( | ||
name="BashOperator", | ||
properties={attr: value for attr, value in self.operator.__dict__.items()}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to include the whole dag. (since self.operator.dag
is a property.) Do you want that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits, and a question about what we include in the facet (I think it pulls in the dag) -- I'm not sure we should include all attrbibutes of the operator
source_var = conf.get( | ||
"openlinege", "disable_source_code", fallback=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE") | ||
) | ||
return isinstance(source_var, str) and source_var.lower() not in ("true", "1", "t") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source_var = conf.get( | |
"openlinege", "disable_source_code", fallback=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE") | |
) | |
return isinstance(source_var, str) and source_var.lower() not in ("true", "1", "t") | |
return conf.getboolean( | |
"openlinege", | |
"disable_source_code", | |
fallback=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "").lower() not in ("true", "1", "t") | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't do this this way unfortunately, since conf.getboolean
needs default value, and default value prevents fallback from working, since it's taken before fallback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any tests that set the airflow config, just the "fallback" env. @conf_vars
can help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashb added more tests with @conf_vars
I've just noticed something. Wasn't one of the key points of AIP-53 that we would have the facet extraction logic on the operator directly? OpenLineage support in providers:
|
e9c7aa6
to
c01af00
Compare
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
c01af00
to
a77ccb8
Compare
@ashb Okay, let's merge parts 1 and 2 of the implementation plan together. We'll create new PRs with OL support directly in operators. For this one it won't work - since it requires support in core Airflow, and that would probably require waiting for 2.7 release. |
Add extractor support for
PythonOperator
andBashOperator
. It's a basic support, adding OL facets to them. It does not yet support advanced features that we plan in the future, like extracting data from taskflow-style jobs.Additionally, slightly refactor registering extractors.