Skip to content

Commit

Permalink
system tests: implement operator, variable transport
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Aug 28, 2023
1 parent 9d53278 commit 25a95e1
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 10 deletions.
18 changes: 18 additions & 0 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,24 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
self.log.info("Total extracted rows: %s", len(event["records"]))
return event["records"]

def get_openlineage_facets_on_start(self):
from openlineage.client.run import Dataset

from airflow.providers.openlineage.extractors import OperatorLineage

if self.project_id is None:
self.project_id = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=self.use_legacy_sql,
).project_id

return OperatorLineage(
inputs=[
Dataset(namespace="bigquery", name=f"{self.project_id}.{self.dataset_id}.{self.table_id}")
]
)


class BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
"""Executes BigQuery SQL queries in a specific BigQuery database.
Expand Down
11 changes: 8 additions & 3 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self):
self.log = logging.getLogger(__name__)
self.extractor_manager = ExtractorManager()
self.adapter = OpenLineageAdapter()
self.current_ti: TaskInstance | None = None

@hookimpl
def on_task_instance_running(
Expand All @@ -59,6 +60,7 @@ def on_task_instance_running(
return

self.log.debug("OpenLineage listener got notification about task instance start")
self.current_ti = task_instance
dagrun = task_instance.dag_run
task = task_instance.task
dag = task.dag
Expand Down Expand Up @@ -101,12 +103,13 @@ def on_running():
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
},
)

on_running()


@hookimpl
def on_task_instance_success(self, previous_state, task_instance: TaskInstance, session):
self.log.debug("OpenLineage listener got notification about task instance success")
self.current_ti = task_instance

dagrun = task_instance.dag_run
task = task_instance.task
Expand Down Expand Up @@ -135,6 +138,7 @@ def on_success():
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, session):
self.log.debug("OpenLineage listener got notification about task instance failure")
self.current_ti = task_instance

dagrun = task_instance.dag_run
task = task_instance.task
Expand Down Expand Up @@ -174,8 +178,9 @@ def on_starting(self, component):
def before_stopping(self, component):
self.log.debug("before_stopping: %s", component.__class__.__name__)
# TODO: configure this with Airflow config
with timeout(30):
self.executor.shutdown(wait=True)
if self._executor:
with timeout(30):
self.executor.shutdown(wait=True)

@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ dependencies:
- apache-airflow>=2.7.0
- apache-airflow-providers-common-sql>=1.6.0
- attrs>=22.2
- openlineage-integration-common>=0.28.0
- openlineage-python>=0.28.0
- openlineage-integration-common>=0.29.2
- openlineage-python>=0.29.2

integrations:
- integration-name: OpenLineage
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/openlineage/transport/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
51 changes: 51 additions & 0 deletions airflow/providers/openlineage/transport/variable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.models import Variable
from airflow.plugins_manager import AirflowPlugin, plugins
from airflow.utils.log.logging_mixin import LoggingMixin
from openlineage.client.run import DatasetEvent, JobEvent, RunEvent
from openlineage.client.serde import Serde
from openlineage.client.transport import Transport


class VariableTransport(Transport, LoggingMixin):
"""This transport sends OpenLineage events to Variables.
Key schema is <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>.
It's made to be used in system tests, stored data read by OpenLineageTestOperator.
"""

Check failure on line 32 in airflow/providers/openlineage/transport/variable.py

View workflow job for this annotation

GitHub Actions / Static checks

Ruff (D205)

airflow/providers/openlineage/transport/variable.py:29:5: D205 1 blank line required between summary line and description
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
...

def emit(self, event: RunEvent | DatasetEvent | JobEvent):
from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin

plugin: AirflowPlugin | None = next( # type: ignore[assignment]
filter(lambda x: isinstance(x, OpenLineageProviderPlugin), plugins) # type: ignore[arg-type]
)
if not plugin:
raise RuntimeError("OpenLineage listener should be set up here")

listener = plugin.listeners[0] # type: ignore
ti = listener.current_ti # type: ignore

key = f"{ti.dag_id}.{ti.task_id}.event.{event.eventType.value.lower()}" # type: ignore[union-attr]
str_event = Serde.to_json(event)
Variable.set(key=key, value=str_event)
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-openlineage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ PIP package Version required
``apache-airflow`` ``>=2.7.0``
``apache-airflow-providers-common-sql`` ``>=1.6.0``
``attrs`` ``>=22.2``
``openlineage-integration-common`` ``>=0.28.0``
``openlineage-python`` ``>=0.28.0``
``openlineage-integration-common`` ``>=0.29.2``
``openlineage-python`` ``>=0.29.2``
======================================= ==================

Cross provider package dependencies
Expand Down
4 changes: 2 additions & 2 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,8 @@
"apache-airflow-providers-common-sql>=1.6.0",
"apache-airflow>=2.7.0",
"attrs>=22.2",
"openlineage-integration-common>=0.28.0",
"openlineage-python>=0.28.0"
"openlineage-integration-common>=0.29.2",
"openlineage-python>=0.29.2"
],
"cross-providers-deps": [
"common.sql"
Expand Down
14 changes: 14 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,23 @@

import pytest

from airflow import plugins_manager
from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin

REQUIRED_ENV_VARS = ("SYSTEM_TESTS_ENV_ID",)


@pytest.fixture(scope="package", autouse=True)
def setup_openlineage():
with mock.patch.dict(
"os.environ",
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "airflow.providers.openlineage.transport.variable'
'.VariableTransport"}',
):
plugins_manager.register_plugin(OpenLineageProviderPlugin())
yield


@pytest.fixture(scope="package", autouse=True)
def use_debug_executor():
with mock.patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="DebugExecutor"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
BigQueryValueCheckOperator,
)
from airflow.utils.trigger_rule import TriggerRule
from tests.test_utils.openlineage import OpenlineageTestOperator

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
Expand Down Expand Up @@ -235,11 +236,21 @@
trigger_rule=TriggerRule.ALL_DONE,
)

openlineage_test = OpenlineageTestOperator(
task_id="openlineage_test",
event_templates={
f"{DAG_ID}.get_data.event.start": {
"eventType": "START",
"inputs": [{"namespace": "bigquery", "name": f"{PROJECT_ID}.{DATASET}.{TABLE_1}"}],
}
},
)

# TEST SETUP
create_dataset >> [create_table_1, create_table_2]
# TEST BODY
[create_table_1, create_table_2] >> insert_query_job >> [select_query_job, execute_insert_query]
execute_insert_query >> get_data >> get_data_result >> delete_dataset
execute_insert_query >> get_data >> get_data_result >> delete_dataset >> openlineage_test
execute_insert_query >> execute_query_save >> bigquery_execute_multi_query >> delete_dataset
execute_insert_query >> [check_count, check_value, check_interval] >> delete_dataset
execute_insert_query >> [column_check, table_check] >> delete_dataset
Expand Down
Loading

0 comments on commit 25a95e1

Please sign in to comment.