Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data_rentgen/db/repositories/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ async def update(self, existing: Dataset, new: DatasetDTO) -> Dataset:
for tag_value_dto in new.tag_values
],
)
# unlike job tags, OL integrations may provide only part of dataset tags.
# so we only insert them, but never delete old ones
return existing

async def paginate(
Expand Down
16 changes: 15 additions & 1 deletion data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
asc,
bindparam,
cast,
delete,
desc,
distinct,
func,
Expand Down Expand Up @@ -83,6 +84,10 @@
)
.on_conflict_do_nothing(index_elements=["job_id", "tag_value_id"])
)
delete_tag_value_query = delete(JobTagValue).where(
JobTagValue.c.job_id == bindparam("job_id"),
~(JobTagValue.c.tag_value_id == any_(bindparam("tag_value_ids"))),
)


class JobRepository(Repository[Job]):
Expand Down Expand Up @@ -232,7 +237,8 @@ async def update(self, existing: Job, new: JobDTO) -> Job:
)

if not new.tag_values:
# in cases when jobs have no tag values we can avoid INSERT statements
# in case when jobs have no tag values we can avoid INSERT statements.
# also parent jobs may have no tag values, so we skip updating them.
return existing

# Lock to prevent inserting the same rows from multiple workers
Expand All @@ -247,6 +253,14 @@ async def update(self, existing: Job, new: JobDTO) -> Job:
for tag_value_dto in new.tag_values
],
)

# To avoid accumulating too many tag values,
# e.g. upgrading version of Airflow/Spark/OL/etc will keep both old and new version tags,
# we keep only tags for the most recent job run.
await self._session.execute(
delete_tag_value_query,
{"job_id": existing.id, "tag_value_ids": [tag_value_dto.id for tag_value_dto in new.tag_values]},
)
return existing

async def list_by_ids(self, job_ids: Collection[int]) -> list[Job]:
Expand Down