Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openlineage: add extractors for python and bash operators #30713

Merged
merged 1 commit into from
May 12, 2023

Conversation

mobuchowski
Copy link
Contributor

Add extractor support for PythonOperator and BashOperator. It's a basic support, adding OL facets to them. It does not yet support advanced features that we plan in the future, like extracting data from taskflow-style jobs.

Additionally, slightly refactor registering extractors.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I was under the impression that we can get rid of the extractors and make the open-lineage facet extraction embedded in the operators ?

@mobuchowski
Copy link
Contributor Author

Hey @potiuk - as I wrote in the implementation doc we want to migrate them, as we already have them it in OL. Then move to gradually reimplementing this in-operator. This would allow us to release apache-airflow-provider-openlineage with parity to openlineage-airflow, so that users moving to Airflow 2.6 could migrate seamlessly.

@potiuk
Copy link
Member

potiuk commented Apr 24, 2023

Hey @potiuk - as I wrote in the implementation doc we want to migrate them, as we already have them it in OL. Then move to gradually reimplementing this in-operator. This would allow us to release apache-airflow-provider-openlineage with parity to openlineage-airflow, so that users moving to Airflow 2.6 could migrate seamlessly.

Ah. OK makes sense. Can we mark all of those classes as :meta private: (not sure if this is the right way to mark the whole modules as private) and add description in those modules/classes so that it is clear those are private parts and not belonging to the API of the provider ?

@mobuchowski
Copy link
Contributor Author

mobuchowski commented Apr 24, 2023

Ideally I'd add :meta private: to whole airflow.providers.openlineage.extractors package.

@potiuk
Copy link
Member

potiuk commented Apr 24, 2023

Ideally I'd add :meta private: to whole airflow.providers.openlineage.extractors package.

yep

@potiuk
Copy link
Member

potiuk commented Apr 24, 2023

I think we should also (discussed it with @eladkal - add a mechanism - similar to suspended but only for releases) to exclude open-lineage from releasing until it is ready.

@eladkal
Copy link
Contributor

eladkal commented Apr 24, 2023

I think we should also (discussed it with @eladkal - add a mechanism - similar to suspended but only for releases) to exclude open-lineage from releasing until it is ready.

yeah the manual removal of the provider from the release is not so nice but I guess this is really an edge case... it's unlikely we will encounter another case of merged new provider that we don't want to release so I'm not sure if worth the effort setting this option

@potiuk
Copy link
Member

potiuk commented Apr 24, 2023

I think we should also (discussed it with @eladkal - add a mechanism - similar to suspended but only for releases) to exclude open-lineage from releasing until it is ready.

yeah the manual removal of the provider from the release is not so nice but I guess this is really an edge case... it's unlikely we will encounter another case of merged new provider that we don't want to release so I'm not sure if worth the effort setting this option

Indeed. That's why I just HARDCODED It for now : #30841

@mobuchowski
Copy link
Contributor Author

Thanks @eladkal @potiuk - that makes sense

@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch from fc9792a to 731fff3 Compare April 26, 2023 12:45
@mobuchowski
Copy link
Contributor Author

@potiuk @eladkal anything still needed to merge it?

@josh-fell
Copy link
Contributor

Just curious, should this provider be AIP-21 compliant? If so, maybe we have file paths of airflow/providers/openlineage/extractors/bash.py and airflow/providers/openlineage/extractors/python.py instead?

@mobuchowski
Copy link
Contributor Author

@josh-fell generally, those wouldn't be used by users of Airflow - you don't need to manually specify those paths anywhere, unless you want to add your own custom extractor, so I personally don't think it matters. I like shorter paths though and I'll change it.

@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch from d14efba to 4817629 Compare April 27, 2023 10:14
@josh-fell
Copy link
Contributor

@josh-fell generally, those wouldn't be used by users of Airflow - you don't need to manually specify those paths anywhere, unless you want to add your own custom extractor, so I personally don't think it matters. I like shorter paths though and I'll change it.

Of course. This was more of a philosophical question since this might not be considered a "traditional" provider so should the AIP apply. I'm a fan because it at least removes some schematic redundancy in the import path, but figured I'd pose the question.

@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch 4 times, most recently from 884b628 to f8df58f Compare April 28, 2023 17:40
return ["BashOperator"]

def extract(self) -> OperatorLineage | None:
collect_source = os.environ.get("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "true").lower() not in (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be from airflow conf now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebased, moved to Airflow conf with env fallback.

Comment on lines 45 to 46
try_import_from_string("airflow.providers.openlineage.extractors.python.PythonExtractor"),
try_import_from_string("airflow.providers.openlineage.extractors.bash.BashExtractor"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given these are modules from the same provider I don't see how these could fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - moved those to regular imports.

@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch 3 times, most recently from d298c67 to 421b0b2 Compare May 8, 2023 14:03
@mobuchowski mobuchowski requested a review from ashb May 8, 2023 15:57
@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch 2 times, most recently from dfa1758 to d736a41 Compare May 9, 2023 14:17
@mobuchowski
Copy link
Contributor Author

@potiuk @josh-fell @pierrejeambrun can you merge?

Comment on lines +42 to +44
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["BashOperator"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe a bigger question, but is there a reason this couldn't be just a property?

Suggested change
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["BashOperator"]
operator_classnames: list[str] = ["BashOperator"]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be, but we'd have to keep get_operator_classnames on base extractor for backwards compatibility.

unknownItems=[
UnknownOperatorInstance(
name="BashOperator",
properties={attr: value for attr, value in self.operator.__dict__.items()},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to include the whole dag. (since self.operator.dag is a property.) Do you want that?

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits, and a question about what we include in the facet (I think it pulls in the dag) -- I'm not sure we should include all attrbibutes of the operator

airflow/providers/openlineage/utils/utils.py Outdated Show resolved Hide resolved
Comment on lines 403 to 408
source_var = conf.get(
"openlinege", "disable_source_code", fallback=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE")
)
return isinstance(source_var, str) and source_var.lower() not in ("true", "1", "t")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
source_var = conf.get(
"openlinege", "disable_source_code", fallback=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE")
)
return isinstance(source_var, str) and source_var.lower() not in ("true", "1", "t")
return conf.getboolean(
"openlinege",
"disable_source_code",
fallback=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "").lower() not in ("true", "1", "t")
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't do this this way unfortunately, since conf.getboolean needs default value, and default value prevents fallback from working, since it's taken before fallback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any tests that set the airflow config, just the "fallback" env. @conf_vars can help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb added more tests with @conf_vars

@ashb
Copy link
Member

ashb commented May 10, 2023

I've just noticed something. Wasn't one of the key points of AIP-53 that we would have the facet extraction logic on the operator directly?

OpenLineage support in providers:

As part of this new provider, we define a new optional contract for operators defined in the documentation here: get_openlineage_facets*. Each

@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch 2 times, most recently from e9c7aa6 to c01af00 Compare May 12, 2023 13:52
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
@mobuchowski mobuchowski force-pushed the openlineage-base-extractors branch from c01af00 to a77ccb8 Compare May 12, 2023 15:00
@mobuchowski
Copy link
Contributor Author

@ashb Okay, let's merge parts 1 and 2 of the implementation plan together. We'll create new PRs with OL support directly in operators.

For this one it won't work - since it requires support in core Airflow, and that would probably require waiting for 2.7 release.

@ashb ashb merged commit 981afe2 into apache:main May 12, 2023
@ephraimbuddy ephraimbuddy added this to the Airflow 2.7.0 milestone Jul 6, 2023
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Jul 6, 2023
@ephraimbuddy ephraimbuddy removed this from the Airflow 2.7.0 milestone Jul 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) provider:openlineage AIP-53
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants