Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagster registry generation and persist #25260

Merged
merged 22 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add json sanitized dict helper
  • Loading branch information
bnchrch committed Apr 20, 2023
commit 3e77028c3b0a1c960574968a1b4962c8fb134cf8
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ def oss_registry_diff(oss_registry_from_metadata: ConnectorRegistryV0, legacy_os
"""
oss_registry_from_metadata_dict = json.loads(oss_registry_from_metadata.json())
legacy_oss_registry_dict = json.loads(legacy_oss_registry.json())
# import pdb; pdb.set_trace()
return diff_registries(legacy_oss_registry_dict, oss_registry_from_metadata_dict).to_dict()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from orchestrator.models.metadata import MetadataDefinition
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.utils.object_helpers import deep_copy_params
from orchestrator.utils.object_helpers import deep_copy_params, to_json_sanitized_dict

from dagster_gcp.gcs.file_manager import GCSFileManager, GCSFileHandle

Expand Down Expand Up @@ -43,11 +43,9 @@ def apply_overrides_from_registry(metadata_data: dict, override_registry_key: st
Returns:
dict: The metadata data field with the overrides applied.
"""
# import pdb; pdb.set_trace()
override_registry = metadata_data["registries"][override_registry_key]
del override_registry["enabled"]

# TODO find a nicer way to handle this
# remove any None values from the override registry
override_registry = {k: v for k, v in override_registry.items() if v is not None}

Expand Down Expand Up @@ -138,13 +136,6 @@ def construct_registry_from_metadata(legacy_registry_derived_metadata_definition

return {"sources": registry_sources, "destinations": registry_destinations}

class UUIDEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
return json.JSONEncoder.default(self, obj)

def construct_registry_with_spec_from_registry(registry: dict, cached_specs: OutputDataFrame) -> dict:
entries = [("source", entry) for entry in registry["sources"]] + [("destinations", entry) for entry in registry["destinations"]]

Expand Down Expand Up @@ -212,7 +203,6 @@ def oss_registry_from_metadata(context: OpExecutionContext, metadata_definitions
"""
registry_name = "oss"
registry_directory_manager = context.resources.registry_directory_manager
# import pdb; pdb.set_trace()

from_metadata = construct_registry_from_metadata(metadata_definitions, registry_name)
registry_dict = construct_registry_with_spec_from_registry(from_metadata, cached_specs)
Expand All @@ -228,28 +218,28 @@ def oss_registry_from_metadata(context: OpExecutionContext, metadata_definitions

@asset(group_name=GROUP_NAME)
def cloud_sources_dataframe(cloud_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame:
cloud_registry_from_metadata_dict = json.loads(cloud_registry_from_metadata.json())
cloud_registry_from_metadata_dict = to_json_sanitized_dict(cloud_registry_from_metadata)
sources = cloud_registry_from_metadata_dict["sources"]
return output_dataframe(pd.DataFrame(sources))


@asset(group_name=GROUP_NAME)
def oss_sources_dataframe(oss_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame:
oss_registry_from_metadata_dict = json.loads(oss_registry_from_metadata.json())
oss_registry_from_metadata_dict = to_json_sanitized_dict(oss_registry_from_metadata)
sources = oss_registry_from_metadata_dict["sources"]
return output_dataframe(pd.DataFrame(sources))


@asset(group_name=GROUP_NAME)
def cloud_destinations_dataframe(cloud_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame:
cloud_registry_from_metadata_dict = json.loads(cloud_registry_from_metadata.json())
cloud_registry_from_metadata_dict = to_json_sanitized_dict(cloud_registry_from_metadata)
destinations = cloud_registry_from_metadata_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))


@asset(group_name=GROUP_NAME)
def oss_destinations_dataframe(oss_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame:
oss_registry_from_metadata_dict = json.loads(oss_registry_from_metadata.json())
oss_registry_from_metadata_dict = to_json_sanitized_dict(oss_registry_from_metadata)
destinations = oss_registry_from_metadata_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REGISTRIES_FOLDER = "registry"
REGISTRIES_FOLDER = "registries/v0"
REPORT_FOLDER = "generated_reports"

CONNECTOR_REPO_NAME = "airbytehq/airbyte"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import mergedeep
import json
from deepdiff import DeepDiff
from typing import TypeVar
from pydantic import BaseModel
import copy

T = TypeVar("T")
Expand Down Expand Up @@ -28,3 +30,16 @@ def f(*args, **kwargs):
return to_call(*copy.deepcopy(args), **copy.deepcopy(kwargs))

return f

def to_json_sanitized_dict(pydantic_model_obj: BaseModel) -> dict:
"""A helper function to convert a pydantic model to a sanitized dict.

Without this pydantic dictionary may contain values that are not JSON serializable.

Args:
pydantic_model_obj (BaseModel): a pydantic model

Returns:
dict: a sanitized dictionary
"""
return json.loads(pydantic_model_obj.json())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is something that should be exposed at the model level as a class method. I guess it's hard to move it there because models are generated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, however I could turn it into a mixin. Let me know if you think thats a better choice. Im on the fence