Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(registry): fix auto materialize #38094

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]:
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
etag = context.partition_key
context.log.info(f"Processing metadata file with etag {etag}")
Expand All @@ -475,6 +475,8 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram
if not matching_blob:
raise Exception(f"Could not find blob with etag {etag}")

airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context)

metadata_dict = yaml_blob_to_dict(matching_blob)
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict)
commit_sha = safe_get_commit_sha(metadata_dict)
Expand Down Expand Up @@ -548,16 +550,16 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def registry_entry(
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame
) -> Output[Optional[dict]]:
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
"""
Generate the registry entry files from the given metadata file, and persist it to GCS.
"""
if not metadata_entry:
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context)

user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition)
commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Union
from typing import Optional, Union

import pandas as pd
from dagster import OpExecutionContext
from metadata_service.constants import METADATA_FILE_NAME
from metadata_service.gcs_upload import get_metadata_remote_file_path
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
Expand Down Expand Up @@ -109,3 +111,29 @@ def sanitize_docker_repo_name_for_dependency_file(docker_repo_name: str) -> str:
"""

return docker_repo_name.replace("airbyte/", "")


def get_airbyte_slack_users_from_graph(context: OpExecutionContext) -> Optional[pd.DataFrame]:
"""
Get the airbyte slack users from the graph.

Important: Directly relates to the airbyte_slack_users asset. Requires the asset to be materialized in the graph.

Problem:
I guess having dynamic partitioned assets that automatically materialize depending on another asset is a bit too much to ask for.

Solution:
Just get the asset from the graph, but dont declare it as a dependency.

Context:
https://airbytehq-team.slack.com/archives/C048P9GADFW/p1715276222825929
"""
try:
from orchestrator import defn

airbyte_slack_users = defn.load_asset_value("airbyte_slack_users", instance=context.instance)
context.log.info(f"Got airbyte slack users from graph: {airbyte_slack_users}")
return airbyte_slack_users
except Exception as e:
context.log.error(f"Failed to get airbyte slack users from graph: {e}")
return None
Loading