Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -863,3 +863,29 @@ def test_connection(self) -> tuple[bool, str]:
message = str(e)

return status, message

def get_openlineage_database_info(self, _):
"""Return Databricks-specific database info for OpenLineage namespace resolution."""
from airflow.providers.openlineage.sqlparser import DatabaseInfo

port = f":{self.databricks_conn.port}" if self.databricks_conn.port else ""

return DatabaseInfo(
scheme=self.get_openlineage_database_dialect(None),
authority=f"{self.host}{port}",
information_schema_columns=[
"table_schema",
"table_name",
"column_name",
"ordinal_position",
"data_type",
"table_catalog",
],
is_information_schema_cross_db=True,
)

def get_openlineage_database_dialect(self, _) -> str:
return "databricks"

def get_openlineage_default_schema(self) -> str | None:
return "default"
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,18 @@ def test_update_job_permission(self, mock_requests):
timeout=self.hook.timeout_seconds,
)

def test_openlineage_methods(self):
from airflow.providers.openlineage.sqlparser import DatabaseInfo

db_info = self.hook.get_openlineage_database_info(None)
assert isinstance(db_info, DatabaseInfo)
assert db_info.scheme == "databricks"
assert db_info.authority == HOST
assert db_info.is_information_schema_cross_db is True

assert self.hook.get_openlineage_database_dialect(None) == "databricks"
assert self.hook.get_openlineage_default_schema() == "default"


@pytest.mark.db_test
class TestDatabricksHookToken:
Expand Down