Skip to content

Commit

Permalink
feat(registry): add remove stale partition job (#38165)
Browse files Browse the repository at this point in the history
## What
Add a job that lets us remove partition keys that no longer exist

## Why
We have > 10,000 partitions, one for every metadata file ever. Likely only 500 of those reference files that exist.

Adding this job should let us clean out the noise.

## Future
If it works I'll add it to a nightly job
  • Loading branch information
bnchrch authored May 17, 2024
1 parent 116b3df commit caec5f2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from orchestrator.jobs.metadata import generate_stale_gcs_latest_metadata_file
from orchestrator.jobs.registry import (
add_new_metadata_partitions,
remove_stale_metadata_partitions,
generate_cloud_registry,
generate_oss_registry,
generate_registry_entry,
Expand Down Expand Up @@ -184,6 +185,7 @@
generate_registry_entry,
generate_nightly_reports,
add_new_metadata_partitions,
remove_stale_metadata_partitions,
generate_stale_gcs_latest_metadata_file,
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,32 @@
)


@op(required_resource_keys={"all_metadata_file_blobs"})
def remove_stale_metadata_partitions_op(context):
"""
This op is responsible for polling for new metadata files and adding their etag to the dynamic partition.
"""
all_metadata_file_blobs = context.resources.all_metadata_file_blobs
partition_name = registry_entry.metadata_partitions_def.name

all_fresh_etags = [blob.etag for blob in all_metadata_file_blobs]

all_etag_partitions = context.instance.get_dynamic_partitions(partition_name)

for stale_etag in [etag for etag in all_etag_partitions if etag not in all_fresh_etags]:
context.log.info(f"Removing stale etag: {stale_etag}")
context.instance.delete_dynamic_partition(partition_name, stale_etag)
context.log.info(f"Removed stale etag: {stale_etag}")


@job(tags={"dagster/priority": HIGH_QUEUE_PRIORITY})
def remove_stale_metadata_partitions():
"""
This job is responsible for removing stale metadata partitions (metadata files or versions of files that no longer exist).
"""
remove_stale_metadata_partitions_op()


@op(required_resource_keys={"slack", "all_metadata_file_blobs"})
def add_new_metadata_partitions_op(context):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "orchestrator"
version = "0.1.0"
version = "0.1.1"
description = ""
authors = ["Ben Church <ben@airbyte.io>"]
readme = "README.md"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from unittest import mock

from dagster import build_op_context
from google.cloud.storage import Blob
from orchestrator.assets import registry_entry
from orchestrator.jobs.registry import add_new_metadata_partitions_op, remove_stale_metadata_partitions_op


def test_basic_partition():
context = build_op_context()
partition_key = "test_partition_key"

existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 0
context.instance.add_dynamic_partitions(partition_key, ["partition_1", "partition_2"])
existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 2


def test_metadata_partition_remove():
mock_fresh_blob_1 = mock.create_autospec(Blob, instance=True)
mock_fresh_blob_1.etag = "fresh_etag_1"
mock_fresh_blob_1.name = "fresh_metadata"

mock_fresh_blob_2 = mock.create_autospec(Blob, instance=True)
mock_fresh_blob_2.etag = "fresh_etag_2"
mock_fresh_blob_2.name = "fresh_metadata"

mock_stale_blob = mock.create_autospec(Blob, instance=True)
mock_stale_blob.etag = "stale_etag"
mock_stale_blob.name = "stale_metadata"

mock_metadata_file_blobs = [mock_fresh_blob_1, mock_fresh_blob_2]

resources = {"all_metadata_file_blobs": mock_metadata_file_blobs}

context = build_op_context(resources=resources)

partition_key = registry_entry.metadata_partitions_def.name

existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 0

context.instance.add_dynamic_partitions(partition_key, [mock_fresh_blob_1.etag, mock_stale_blob.etag])
existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 2

remove_stale_metadata_partitions_op(context)

existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 1
assert mock_stale_blob.etag not in existing_partitions


def test_metadata_partition_add():
mock_fresh_blob_1 = mock.create_autospec(Blob, instance=True)
mock_fresh_blob_1.etag = "fresh_etag_1"
mock_fresh_blob_1.name = "fresh_metadata"

mock_fresh_blob_2 = mock.create_autospec(Blob, instance=True)
mock_fresh_blob_2.etag = "fresh_etag_2"
mock_fresh_blob_2.name = "fresh_metadata"

mock_existing_blob = mock.create_autospec(Blob, instance=True)
mock_existing_blob.etag = "existing_etag"
mock_existing_blob.name = "existing_metadata"

mock_stale_blob = mock.create_autospec(Blob, instance=True)
mock_stale_blob.etag = "stale_etag"
mock_stale_blob.name = "stale_metadata"

mock_metadata_file_blobs = [mock_fresh_blob_1, mock_fresh_blob_2]

mock_slack = mock.MagicMock()
mock_slack.get_client = mock.MagicMock()
chat_postMessage = mock.MagicMock()
mock_slack.get_client.return_value = chat_postMessage

resources = {"slack": mock_slack, "all_metadata_file_blobs": mock_metadata_file_blobs}

context = build_op_context(resources=resources)

partition_key = registry_entry.metadata_partitions_def.name

existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 0

context.instance.add_dynamic_partitions(partition_key, [mock_stale_blob.etag, mock_existing_blob.etag])
existing_partitions = context.instance.get_dynamic_partitions(partition_key)
assert len(existing_partitions) == 2

add_new_metadata_partitions_op(context)

existing_partitions = context.instance.get_dynamic_partitions(partition_key)
expected_partitions = [mock_fresh_blob_1.etag, mock_fresh_blob_2.etag, mock_existing_blob.etag, mock_stale_blob.etag]

# assert all expected partitions are in the existing partitions, and no other partitions are present, order does not matter
assert all([etag in existing_partitions for etag in expected_partitions])

0 comments on commit caec5f2

Please sign in to comment.