Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
union,
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import aliased, selectinload

from data_rentgen.db.models import Address, Job, JobLastRun, JobTagValue, Location, TagValue
from data_rentgen.db.repositories.base import Repository
Expand Down Expand Up @@ -87,6 +87,61 @@
~(JobTagValue.c.tag_value_id == any_(bindparam("tag_value_ids"))),
)

child_job = aliased(Job, name="child")
parent_job = aliased(Job, name="parent")

ancestors_by_job_base_part = (
select(
child_job.id.label("child_job_id"),
parent_job.id.label("parent_job_id"),
)
.select_from(child_job)
.join(parent_job, child_job.parent_job_id == parent_job.id)
.where(
child_job.id == any_(bindparam("job_ids")),
)
)
ancestors_by_job_cte = ancestors_by_job_base_part.cte("ancestors_by_job", recursive=True)

ancestors_by_job_recursive_part = (
select(
child_job.id.label("child_job_id"),
parent_job.id.label("parent_job_id"),
)
.select_from(child_job)
.join(parent_job, child_job.parent_job_id == parent_job.id)
.where(
child_job.id == ancestors_by_job_cte.c.parent_job_id,
)
)
ancestors_by_job_cte = ancestors_by_job_cte.union(ancestors_by_job_recursive_part)

descendants_by_job_base_part = (
select(
parent_job.id.label("parent_job_id"),
child_job.id.label("child_job_id"),
)
.select_from(parent_job)
.join(child_job, child_job.parent_job_id == parent_job.id)
.where(
parent_job.id == any_(bindparam("job_ids")),
)
)
descendants_by_job_cte = descendants_by_job_base_part.cte("descendants_by_job", recursive=True)

descendants_by_job_recursive_part = (
select(
parent_job.id.label("parent_job_id"),
child_job.id.label("child_job_id"),
)
.select_from(parent_job)
.join(child_job, child_job.parent_job_id == parent_job.id)
.where(
parent_job.id == descendants_by_job_cte.c.child_job_id,
)
)
descendants_by_job_cte = descendants_by_job_cte.union(descendants_by_job_recursive_part)


class JobRepository(Repository[Job]):
async def paginate(
Expand Down Expand Up @@ -275,3 +330,31 @@ async def get_stats_by_location_ids(self, location_ids: Collection[int]) -> dict

query_result = await self._session.execute(get_stats_query, {"location_ids": list(location_ids)})
return {row.location_id: row for row in query_result.all()}

async def list_ancestor_relations(self, job_ids: Collection[int]):
if not job_ids:
return []
stmt = select(
ancestors_by_job_cte.c.parent_job_id,
ancestors_by_job_cte.c.child_job_id,
)
result = await self._session.execute(
stmt,
{
"job_ids": list(job_ids),
},
)
return list(result.fetchall())

async def list_descendant_relations(self, job_ids: Collection[int]):
stmt = select(
descendants_by_job_cte.c.parent_job_id,
descendants_by_job_cte.c.child_job_id,
)
result = await self._session.execute(
stmt,
{
"job_ids": list(job_ids),
},
)
return list(result.fetchall())
57 changes: 48 additions & 9 deletions data_rentgen/db/repositories/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,10 @@
child_run = aliased(Run, name="child")
parent_run = aliased(Run, name="parent")

parents_by_run_base_part = (
ancestors_by_run_base_part = (
select(
child_run.id.label("child_run_id"),
child_run.job_id.label("child_job_id"),
parent_run.id.label("parent_run_id"),
parent_run.job_id.label("parent_job_id"),
)
.select_from(child_run)
.join(parent_run, child_run.parent_run_id == parent_run.id)
Expand All @@ -103,22 +101,48 @@
child_run.created_at <= bindparam("until"),
)
)
ancestors_by_run_cte = parents_by_run_base_part.cte("parents_by_run", recursive=True)
ancestors_by_run_cte = ancestors_by_run_base_part.cte("ancestors_by_run", recursive=True)

parents_by_run_recursive_part = (
ancestors_by_run_recursive_part = (
select(
child_run.id.label("child_run_id"),
child_run.job_id.label("child_job_id"),
parent_run.id.label("parent_run_id"),
parent_run.job_id.label("parent_job_id"),
)
.select_from(child_run)
.join(parent_run, child_run.parent_run_id == parent_run.id)
.where(
child_run.id == ancestors_by_run_cte.c.parent_run_id,
)
)
ancestors_by_run_cte = ancestors_by_run_cte.union(parents_by_run_recursive_part)
ancestors_by_run_cte = ancestors_by_run_cte.union(ancestors_by_run_recursive_part)

descendants_by_run_base_part = (
select(
parent_run.id.label("parent_run_id"),
child_run.id.label("child_run_id"),
)
.select_from(parent_run)
.join(child_run, child_run.parent_run_id == parent_run.id)
.where(
parent_run.id == any_(bindparam("run_ids")),
parent_run.created_at >= bindparam("since"),
parent_run.created_at <= bindparam("until"),
)
)
descendants_by_run_cte = descendants_by_run_base_part.cte("descendants_by_run", recursive=True)

descendants_by_run_recursive_part = (
select(
parent_run.id.label("parent_run_id"),
child_run.id.label("child_run_id"),
)
.select_from(parent_run)
.join(child_run, child_run.parent_run_id == parent_run.id)
.where(
parent_run.id == descendants_by_run_cte.c.child_run_id,
)
)
descendants_by_run_cte = descendants_by_run_cte.union(descendants_by_run_recursive_part)


class RunRepository(Repository[Run]):
Expand Down Expand Up @@ -378,7 +402,7 @@ async def create_or_update_bulk(self, runs: list[RunDTO]) -> None:
],
)

async def list_runs_ancestor_relations(self, run_ids: Collection[UUID]):
async def list_ancestor_relations(self, run_ids: Collection[UUID]):
if not run_ids:
return []
stmt = select(
Expand All @@ -394,3 +418,18 @@ async def list_runs_ancestor_relations(self, run_ids: Collection[UUID]):
},
)
return list(result.fetchall())

async def list_descendant_relations(self, run_ids: Collection[UUID]):
stmt = select(
descendants_by_run_cte.c.parent_run_id,
descendants_by_run_cte.c.child_run_id,
)
result = await self._session.execute(
stmt,
{
"since": extract_timestamp_from_uuid(min(run_ids)),
"until": extract_timestamp_from_uuid(max(run_ids)),
"run_ids": list(run_ids),
},
)
return list(result.fetchall())
26 changes: 24 additions & 2 deletions data_rentgen/server/services/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class LineageServiceResult:
column_lineage: dict[tuple[int, int], list[ColumnLineageRow]] = field(default_factory=dict)
io_dataset_relations: dict[tuple[int, int], IODatasetRelationRow] = field(default_factory=dict)
run_ancestor_relations: set[tuple[UUID, UUID]] = field(default_factory=set)
job_ancestor_relations: set[tuple[int, int]] = field(default_factory=set)

def merge(self, other: "LineageServiceResult") -> "LineageServiceResult":
self.jobs.update(other.jobs)
Expand All @@ -52,6 +53,7 @@ def merge(self, other: "LineageServiceResult") -> "LineageServiceResult":
self.column_lineage.update(other.column_lineage)
self.io_dataset_relations.update(other.io_dataset_relations)
self.run_ancestor_relations.update(other.run_ancestor_relations)
self.job_ancestor_relations.update(other.job_ancestor_relations)
return self


Expand Down Expand Up @@ -83,7 +85,7 @@ class LineageService:
def __init__(self, uow: Annotated[UnitOfWork, Depends()]):
self._uow = uow

async def get_lineage_by_jobs( # noqa: C901, PLR0912
async def get_lineage_by_jobs( # noqa: C901, PLR0912, PLR0915
self,
start_node_ids: Collection[int],
direction: LineageDirectionV1,
Expand Down Expand Up @@ -115,6 +117,13 @@ async def get_lineage_by_jobs( # noqa: C901, PLR0912

# Always include all requested jobs.
jobs_by_id = {job.id: job for job in jobs}
if level == 0:
relations = await self._uow.job.list_descendant_relations(start_node_ids)
child_jobs_ids = {c_id for _, c_id in relations}
child_jobs = await self._uow.job.list_by_ids(child_jobs_ids)
jobs.extend(child_jobs)
child_jobs_by_id = {job.id: job for job in child_jobs}
jobs_by_id |= child_jobs_by_id

inputs: list[InputRow] = []
outputs: list[OutputRow] = []
Expand Down Expand Up @@ -190,6 +199,8 @@ async def get_lineage_by_jobs( # noqa: C901, PLR0912
for output in outputs
},
)
if level == 0:
result.job_ancestor_relations.update({tuple(r) for r in relations})

upstream_dataset_ids = {input_.dataset_id for input_ in inputs} - ids_to_skip.datasets
downstream_dataset_ids = {output.dataset_id for output in outputs} - ids_to_skip.datasets
Expand Down Expand Up @@ -234,6 +245,7 @@ async def get_lineage_by_jobs( # noqa: C901, PLR0912
(dataset_symlink.from_dataset_id, dataset_symlink.to_dataset_id): dataset_symlink
for dataset_symlink in dataset_symlinks
}

if level == 0:
result.merge(await self._populate_parents(result.runs.keys(), granularity=granularity))
if include_column_lineage:
Expand Down Expand Up @@ -294,6 +306,14 @@ async def get_lineage_by_runs( # noqa: C901, PLR0915, PLR0912

# Always include all requested runs.
runs_by_id = {run.id: run for run in runs}
# Include child runs
if level == 0:
relations = await self._uow.run.list_descendant_relations(start_node_ids)
child_runs_ids = {c_id for _, c_id in relations}
child_runs = await self._uow.run.list_by_ids(child_runs_ids)
runs.extend(child_runs)
child_runs_by_id = {run.id: run for run in child_runs}
runs_by_id |= child_runs_by_id

inputs: list[InputRow] = []
outputs: list[OutputRow] = []
Expand Down Expand Up @@ -392,6 +412,8 @@ async def get_lineage_by_runs( # noqa: C901, PLR0915, PLR0912
for output in outputs
},
)
if level == 0:
result.run_ancestor_relations.update({tuple(r) for r in relations})

upstream_dataset_ids = {input_.dataset_id for input_ in inputs} - ids_to_skip.datasets
downstream_dataset_ids = {output.dataset_id for output in outputs} - ids_to_skip.datasets
Expand Down Expand Up @@ -1314,7 +1336,7 @@ async def _populate_parents(
return LineageServiceResult()
match granularity:
case "RUN" | "OPERATION":
relations = await self._uow.run.list_runs_ancestor_relations(run_ids)
relations = await self._uow.run.list_ancestor_relations(run_ids)
parents_run_ids = {p_id for p_id, _ in relations}
runs = await self._uow.run.list_by_ids(parents_run_ids)
runs_by_id = {run.id: run for run in runs}
Expand Down
16 changes: 14 additions & 2 deletions data_rentgen/server/utils/lineage_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def build_lineage_response(lineage: LineageServiceResult) -> LineageResponseV1:
),
relations=LineageRelationsResponseV1(
parents=_get_run_parent_relations(lineage.runs) + _get_operation_parent_relations(lineage.operations),
ancestors=_get_runs_ancestor_chain(lineage.run_ancestor_relations),
ancestors=_get_runs_hierarchy_chain(lineage.run_ancestor_relations)
+ _get_jobs_hierarchy_chain(lineage.job_ancestor_relations),
symlinks=_get_symlink_relations(lineage.dataset_symlinks),
inputs=_get_input_relations(lineage.inputs),
outputs=_get_output_relations(lineage.outputs),
Expand Down Expand Up @@ -333,7 +334,7 @@ def _get_datasets_with_dataset_granularity(
return datasets


def _get_runs_ancestor_chain(runs_relations: set[tuple[UUID, UUID]]) -> list[LineageParentRelationV1]:
def _get_runs_hierarchy_chain(runs_relations: set[tuple[UUID, UUID]]) -> list[LineageParentRelationV1]:
parents = []
for parent_run_id, run_id in runs_relations:
relation = LineageParentRelationV1(
Expand All @@ -342,3 +343,14 @@ def _get_runs_ancestor_chain(runs_relations: set[tuple[UUID, UUID]]) -> list[Lin
)
parents.append(relation)
return sorted(parents, key=lambda x: (x.from_.id, x.to.id))


def _get_jobs_hierarchy_chain(jobs_relations: set[tuple[int, int]]) -> list[LineageParentRelationV1]:
parents = []
for parent_job_id, job_id in jobs_relations:
relation = LineageParentRelationV1(
from_=LineageEntityV1(kind=LineageEntityKindV1.JOB, id=str(parent_job_id)),
to=LineageEntityV1(kind=LineageEntityKindV1.JOB, id=str(job_id)),
)
parents.append(relation)
return sorted(parents, key=lambda x: (x.from_.id, x.to.id))
4 changes: 4 additions & 0 deletions docs/changelog/next_release/399.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add child runs and jobs to lineage responses. All new relations are return in the same field `ancestors`



4 changes: 2 additions & 2 deletions tests/test_server/fixtures/factories/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,13 +1367,13 @@ async def lineage_with_parent_run_relations(
async_session,
location_id=airflow_location.id,
job_type_id=airflow_task_job_type.id,
job_kwargs={"name": "airflow_task_name"},
job_kwargs={"name": "airflow_task_name", "parent_job_id": airflow_dag.id},
)
spark_application = await create_job(
async_session,
location_id=spark_location.id,
job_type_id=spark_application_job_type.id,
job_kwargs={"name": "spark_application_name"},
job_kwargs={"name": "spark_application_name", "parent_job_id": airflow_task.id},
)
lineage.jobs = [airflow_dag, airflow_task, spark_application]

Expand Down
46 changes: 46 additions & 0 deletions tests/test_server/test_lineage/test_job_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tests.test_server.utils.convert_to_json import (
datasets_to_json,
inputs_to_json,
jobs_ancestors_to_json,
jobs_to_json,
outputs_to_json,
run_parents_to_json,
Expand Down Expand Up @@ -1161,3 +1162,48 @@ async def test_get_job_lineage_with_granularity_run_and_ancestor_relations(
"operations": {},
},
}


async def test_get_job_lineage_with_descendant_relations(
test_client: AsyncClient,
async_session: AsyncSession,
lineage_with_parent_run_relations: LineageResult,
mocked_user: MockedUser,
):
lineage = lineage_with_parent_run_relations
run = lineage.runs[0]
job = next(job for job in lineage.jobs if run.job_id == job.id)
since = run.created_at

jobs = await enrich_jobs(lineage.jobs, async_session)
datasets = await enrich_datasets(lineage.datasets, async_session)

response = await test_client.get(
"v1/jobs/lineage",
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
params={
"since": since.isoformat(),
"start_node_id": job.id,
},
)

assert response.status_code == HTTPStatus.OK, response.json()
assert response.json() == {
"relations": {
"parents": [],
"ancestors": jobs_ancestors_to_json(jobs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
],
"outputs": [],
"direct_column_lineage": [],
"indirect_column_lineage": [],
},
"nodes": {
"datasets": datasets_to_json(datasets),
"jobs": jobs_to_json(jobs),
"runs": {},
"operations": {},
},
}
Loading