Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,33 @@ You can enable this automation by setting ``spark_inject_parent_job_info`` optio
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true


Passing transport information to Spark jobs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

OpenLineage integration can automatically inject Airflow's transport information into Spark application properties,
for :ref:`supported Operators <supported_classes:openlineage>`.
It allows Spark integration to send events to the same backend as Airflow integration without manual configuration.
See `Scheduling from Airflow <https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.

.. note::

If any of the ``spark.openlineage.transport*`` properties are manually specified in the Spark job configuration, the integration will refrain from injecting transport properties to ensure that manually provided values are preserved.

You can enable this automation by setting ``spark_inject_transport_info`` option to ``true`` in Airflow configuration.

.. code-block:: ini

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_transport_info = true

``AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO`` environment variable is an equivalent.

.. code-block:: ini

AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true


Troubleshooting
===============

Expand Down
7 changes: 5 additions & 2 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ See :ref:`automatic injection of parent job information <options:spark_inject_pa
apache-airflow-providers-google
"""""""""""""""""""""""""""""""

- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
log = logging.getLogger(__name__)

if TYPE_CHECKING:
from airflow.providers.openlineage.utils.spark import inject_parent_job_information_into_spark_properties
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
else:
try:
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
except ImportError:
try:
Expand Down Expand Up @@ -64,5 +68,63 @@ def inject_parent_job_information_into_spark_properties(properties: dict, contex
}
return {**properties, **ol_parent_job_properties}

try:
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
except ImportError:

def inject_transport_information_into_spark_properties(properties: dict, context) -> dict:
log.warning(
"Could not import `airflow.providers.openlineage.plugins.listener`."
"Skipping the injection of OpenLineage transport information into Spark properties."
)
return properties

else:

def inject_transport_information_into_spark_properties(properties: dict, context) -> dict:
if any(str(key).startswith("spark.openlineage.transport") for key in properties):
log.info(
"Some OpenLineage properties with transport information are already present "
"in Spark properties. Skipping the injection of OpenLineage "
"transport information into Spark properties."
)
return properties

transport = get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
if transport.kind != "http":
log.info(
"OpenLineage transport type `%s` does not support automatic "
"injection of OpenLineage transport information into Spark properties.",
transport.kind,
)
return {}

transport_properties = {
"spark.openlineage.transport.type": "http",
"spark.openlineage.transport.url": transport.url,
"spark.openlineage.transport.endpoint": transport.endpoint,
# Timeout is converted to milliseconds, as required by Spark integration,
"spark.openlineage.transport.timeoutInMillis": str(int(transport.timeout * 1000)),
}
if transport.compression:
transport_properties["spark.openlineage.transport.compression"] = str(
transport.compression
)

if hasattr(transport.config.auth, "api_key") and transport.config.auth.get_bearer():
transport_properties["spark.openlineage.transport.auth.type"] = "api_key"
transport_properties["spark.openlineage.transport.auth.apiKey"] = (
transport.config.auth.get_bearer()
)

if hasattr(transport.config, "custom_headers") and transport.config.custom_headers:
for key, value in transport.config.custom_headers.items():
transport_properties[f"spark.openlineage.transport.headers.{key}"] = value

return {**properties, **transport_properties}


__all__ = ["inject_parent_job_information_into_spark_properties"]
__all__ = [
"inject_parent_job_information_into_spark_properties",
"inject_transport_information_into_spark_properties",
]
128 changes: 89 additions & 39 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from airflow.providers.common.compat.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
from airflow.providers.google import __version__ as provider_version
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
Expand Down Expand Up @@ -453,31 +454,38 @@ def _replace_dataproc_job_properties(job: dict, job_type: str, new_properties: d


def inject_openlineage_properties_into_dataproc_job(
job: dict, context: Context, inject_parent_job_info: bool
job: dict, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark job definition.

Function is not removing any configuration or modifying the job in any other way,
apart from adding desired OpenLineage properties to Dataproc job definition if not already present.
This function does not remove existing configurations or modify the job definition in any way,
except to add the required OpenLineage properties if they are not already present.

Note:
Any modification to job will be skipped if:
- OpenLineage provider is not accessible.
- The job type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
in the Spark job definition.
The entire properties injection process will be skipped if any condition is met:
- The OpenLineage provider is not accessible.
- The job type is unsupported.
- Both `inject_parent_job_info` and `inject_transport_info` are set to False.

Additionally, specific information will not be injected if relevant OpenLineage properties already exist.

Parent job information will not be injected if:
- Any property prefixed with `spark.openlineage.parent` exists.
- `inject_parent_job_info` is False.
Transport information will not be injected if:
- Any property prefixed with `spark.openlineage.transport` exists.
- `inject_transport_info` is False.

Args:
job: The original Dataproc job definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.

Returns:
The modified job definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return job

Expand All @@ -497,7 +505,17 @@ def inject_openlineage_properties_into_dataproc_job(

properties = job[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=job, job_type=job_type, new_properties=properties
Expand Down Expand Up @@ -587,31 +605,38 @@ def _replace_dataproc_batch_properties(batch: dict | Batch, new_properties: dict


def inject_openlineage_properties_into_dataproc_batch(
batch: dict | Batch, context: Context, inject_parent_job_info: bool
batch: dict | Batch, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict | Batch:
"""
Inject OpenLineage properties into Dataproc batch definition.

It's not removing any configuration or modifying the batch in any other way.
This function add desired OpenLineage properties to Dataproc batch configuration.
This function does not remove existing configurations or modify the batch definition in any way,
except to add the required OpenLineage properties if they are not already present.

The entire properties injection process will be skipped if any condition is met:
- The OpenLineage provider is not accessible.
- The batch type is unsupported.
- Both `inject_parent_job_info` and `inject_transport_info` are set to False.

Note:
Any modification to job will be skipped if:
- OpenLineage provider is not accessible.
- The batch type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
in the Spark job configuration.
Additionally, specific information will not be injected if relevant OpenLineage properties already exist.

Parent job information will not be injected if:
- Any property prefixed with `spark.openlineage.parent` exists.
- `inject_parent_job_info` is False.
Transport information will not be injected if:
- Any property prefixed with `spark.openlineage.transport` exists.
- `inject_transport_info` is False.

Args:
batch: The original Dataproc batch definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.

Returns:
The modified batch definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return batch

Expand All @@ -631,38 +656,55 @@ def inject_openlineage_properties_into_dataproc_batch(

properties = _extract_dataproc_batch_properties(batch)

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

batch_with_ol_config = _replace_dataproc_batch_properties(batch=batch, new_properties=properties)
return batch_with_ol_config


def inject_openlineage_properties_into_dataproc_workflow_template(
template: dict, context: Context, inject_parent_job_info: bool
template: dict, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark jobs in Workflow Template.
Inject OpenLineage properties into all Spark jobs within Workflow Template.

This function does not remove existing configurations or modify the jobs definition in any way,
except to add the required OpenLineage properties if they are not already present.

The entire properties injection process for each job will be skipped if any condition is met:
- The OpenLineage provider is not accessible.
- The job type is unsupported.
- Both `inject_parent_job_info` and `inject_transport_info` are set to False.

Function is not removing any configuration or modifying the jobs in any other way,
apart from adding desired OpenLineage properties to Dataproc job definition if not already present.
Additionally, specific information will not be injected if relevant OpenLineage properties already exist.

Note:
Any modification to job will be skipped if:
- OpenLineage provider is not accessible.
- The job type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
in the Spark job definition.
Parent job information will not be injected if:
- Any property prefixed with `spark.openlineage.parent` exists.
- `inject_parent_job_info` is False.
Transport information will not be injected if:
- Any property prefixed with `spark.openlineage.transport` exists.
- `inject_transport_info` is False.

Args:
template: The original Dataproc Workflow Template definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.

Returns:
The modified Workflow Template definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return template

Expand All @@ -688,9 +730,17 @@ def inject_openlineage_properties_into_dataproc_workflow_template(

properties = single_job_definition[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=single_job_definition, job_type=job_type, new_properties=properties
Expand Down
Loading