From caec5f23164c54752f92c728a0d3212f420ecf41 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Fri, 17 May 2024 12:24:48 -0700 Subject: [PATCH] feat(registry): add remove stale partition job (#38165) ## What Add a job that lets us remove partition keys that no longer exist ## Why We have > 10,000 partitions, one for every metadata file ever. Likely only 500 of those reference files that exist. Adding this job should let us clean out the noise. ## Future If it works I'll add it to a nightly job --- .../orchestrator/orchestrator/__init__.py | 2 + .../orchestrator/jobs/registry.py | 26 +++++ .../orchestrator/pyproject.toml | 2 +- .../orchestrator/tests/test_partition_jobs.py | 100 ++++++++++++++++++ 4 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 airbyte-ci/connectors/metadata_service/orchestrator/tests/test_partition_jobs.py diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py index 799e441080fa..7e41d25b00c8 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py @@ -22,6 +22,7 @@ from orchestrator.jobs.metadata import generate_stale_gcs_latest_metadata_file from orchestrator.jobs.registry import ( add_new_metadata_partitions, + remove_stale_metadata_partitions, generate_cloud_registry, generate_oss_registry, generate_registry_entry, @@ -184,6 +185,7 @@ generate_registry_entry, generate_nightly_reports, add_new_metadata_partitions, + remove_stale_metadata_partitions, generate_stale_gcs_latest_metadata_file, ] diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py index c24d4513bb0f..148924b711e3 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py @@ -24,6 +24,32 @@ ) +@op(required_resource_keys={"all_metadata_file_blobs"}) +def remove_stale_metadata_partitions_op(context): + """ + This op is responsible for polling for new metadata files and adding their etag to the dynamic partition. + """ + all_metadata_file_blobs = context.resources.all_metadata_file_blobs + partition_name = registry_entry.metadata_partitions_def.name + + all_fresh_etags = [blob.etag for blob in all_metadata_file_blobs] + + all_etag_partitions = context.instance.get_dynamic_partitions(partition_name) + + for stale_etag in [etag for etag in all_etag_partitions if etag not in all_fresh_etags]: + context.log.info(f"Removing stale etag: {stale_etag}") + context.instance.delete_dynamic_partition(partition_name, stale_etag) + context.log.info(f"Removed stale etag: {stale_etag}") + + +@job(tags={"dagster/priority": HIGH_QUEUE_PRIORITY}) +def remove_stale_metadata_partitions(): + """ + This job is responsible for removing stale metadata partitions (metadata files or versions of files that no longer exist). + """ + remove_stale_metadata_partitions_op() + + @op(required_resource_keys={"slack", "all_metadata_file_blobs"}) def add_new_metadata_partitions_op(context): """ diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml b/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml index 2f7b0bd019de..8f5997fde4c8 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml +++ b/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "orchestrator" -version = "0.1.0" +version = "0.1.1" description = "" authors = ["Ben Church "] readme = "README.md" diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_partition_jobs.py b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_partition_jobs.py new file mode 100644 index 000000000000..0bf9df7903e9 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_partition_jobs.py @@ -0,0 +1,100 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +from unittest import mock + +from dagster import build_op_context +from google.cloud.storage import Blob +from orchestrator.assets import registry_entry +from orchestrator.jobs.registry import add_new_metadata_partitions_op, remove_stale_metadata_partitions_op + + +def test_basic_partition(): + context = build_op_context() + partition_key = "test_partition_key" + + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 0 + context.instance.add_dynamic_partitions(partition_key, ["partition_1", "partition_2"]) + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 2 + + +def test_metadata_partition_remove(): + mock_fresh_blob_1 = mock.create_autospec(Blob, instance=True) + mock_fresh_blob_1.etag = "fresh_etag_1" + mock_fresh_blob_1.name = "fresh_metadata" + + mock_fresh_blob_2 = mock.create_autospec(Blob, instance=True) + mock_fresh_blob_2.etag = "fresh_etag_2" + mock_fresh_blob_2.name = "fresh_metadata" + + mock_stale_blob = mock.create_autospec(Blob, instance=True) + mock_stale_blob.etag = "stale_etag" + mock_stale_blob.name = "stale_metadata" + + mock_metadata_file_blobs = [mock_fresh_blob_1, mock_fresh_blob_2] + + resources = {"all_metadata_file_blobs": mock_metadata_file_blobs} + + context = build_op_context(resources=resources) + + partition_key = registry_entry.metadata_partitions_def.name + + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 0 + + context.instance.add_dynamic_partitions(partition_key, [mock_fresh_blob_1.etag, mock_stale_blob.etag]) + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 2 + + remove_stale_metadata_partitions_op(context) + + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 1 + assert mock_stale_blob.etag not in existing_partitions + + +def test_metadata_partition_add(): + mock_fresh_blob_1 = mock.create_autospec(Blob, instance=True) + mock_fresh_blob_1.etag = "fresh_etag_1" + mock_fresh_blob_1.name = "fresh_metadata" + + mock_fresh_blob_2 = mock.create_autospec(Blob, instance=True) + mock_fresh_blob_2.etag = "fresh_etag_2" + mock_fresh_blob_2.name = "fresh_metadata" + + mock_existing_blob = mock.create_autospec(Blob, instance=True) + mock_existing_blob.etag = "existing_etag" + mock_existing_blob.name = "existing_metadata" + + mock_stale_blob = mock.create_autospec(Blob, instance=True) + mock_stale_blob.etag = "stale_etag" + mock_stale_blob.name = "stale_metadata" + + mock_metadata_file_blobs = [mock_fresh_blob_1, mock_fresh_blob_2] + + mock_slack = mock.MagicMock() + mock_slack.get_client = mock.MagicMock() + chat_postMessage = mock.MagicMock() + mock_slack.get_client.return_value = chat_postMessage + + resources = {"slack": mock_slack, "all_metadata_file_blobs": mock_metadata_file_blobs} + + context = build_op_context(resources=resources) + + partition_key = registry_entry.metadata_partitions_def.name + + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 0 + + context.instance.add_dynamic_partitions(partition_key, [mock_stale_blob.etag, mock_existing_blob.etag]) + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + assert len(existing_partitions) == 2 + + add_new_metadata_partitions_op(context) + + existing_partitions = context.instance.get_dynamic_partitions(partition_key) + expected_partitions = [mock_fresh_blob_1.etag, mock_fresh_blob_2.etag, mock_existing_blob.etag, mock_stale_blob.etag] + + # assert all expected partitions are in the existing partitions, and no other partitions are present, order does not matter + assert all([etag in existing_partitions for etag in expected_partitions])