Skip to content

Commit

Permalink
qa-checks: check metadata version matches dockerfile version (airbyte…
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored May 4, 2023
1 parent 93d89a8 commit 7310494
Show file tree
Hide file tree
Showing 370 changed files with 2,167 additions and 1,908 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from typing import Tuple

import yaml
import base64
import hashlib

from google.cloud import storage
from google.oauth2 import service_account

from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER
from metadata_service.validators.metadata_validator import validate_metadata_images_in_dockerhub
Expand All @@ -24,6 +28,15 @@ def get_metadata_file_path(dockerRepository: str, version: str) -> str:
return f"{METADATA_FOLDER}/{dockerRepository}/{version}/{METADATA_FILE_NAME}"


def compute_gcs_md5(file_name):
hash_md5 = hashlib.md5()
with open(file_name, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)

return base64.b64encode(hash_md5.digest()).decode("utf8")


def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_account_file_path: Path) -> Tuple[bool, str]:
"""Upload a metadata file to a GCS bucket.
Expand Down Expand Up @@ -55,9 +68,30 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_a

version_blob = bucket.blob(version_path)
latest_blob = bucket.blob(latest_path)
if not version_blob.exists():

# reload the blobs to get the md5_hash
if version_blob.exists():
version_blob.reload()
if latest_blob.exists():
latest_blob.reload()

metadata_file_md5_hash = compute_gcs_md5(metadata_file_path)
version_blob_md5_hash = version_blob.md5_hash if version_blob.exists() else None
latest_blob_md5_hash = latest_blob.md5_hash if latest_blob.exists() else None

print(f"Local Metadata md5_hash: {metadata_file_md5_hash}")
print(f"Current Version blob md5_hash: {version_blob_md5_hash}")
print(f"Latest blob md5_hash: {latest_blob_md5_hash}")

# upload if md5_hash is different
if metadata_file_md5_hash != version_blob_md5_hash:
print(f"Uploading {metadata_file_path} to {version_path}...")
version_blob.upload_from_filename(str(metadata_file_path))
uploaded = True
if version_blob.etag != latest_blob.etag:

if metadata_file_md5_hash != latest_blob_md5_hash:
print(f"Uploading {metadata_file_path} to {latest_path}...")
latest_blob.upload_from_filename(str(metadata_file_path))
uploaded = True

return uploaded, version_blob.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

from datetime import date
from typing import List, Optional
from uuid import UUID

Expand Down Expand Up @@ -132,6 +133,17 @@ class Data(BaseModel):
license: str
supportUrl: AnyUrl
githubIssueLabel: str
maxSecondsBetweenMessages: Optional[int] = Field(
None,
description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach",
)
releaseDate: Optional[date] = Field(
None,
description="The date when this connector was first released, in yyyy-mm-dd format.",
)
protocolVersion: Optional[str] = Field(
None, description="the Airbyte Protocol version supported by the connector"
)
connectorSubtype: Literal[
"api", "database", "file", "custom", "message_queue", "unknown"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ properties:
format: uri
githubIssueLabel:
type: string
maxSecondsBetweenMessages:
description: Maximum delay between 2 airbyte protocol messages, in second. The source will timeout if this delay is reached
type: integer
releaseDate:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
protocolVersion:
type: string
description: the Airbyte Protocol version supported by the connector
connectorSubtype:
type: string
enum:
Expand Down
52 changes: 37 additions & 15 deletions airbyte-ci/connectors/metadata_service/lib/tests/test_gcs_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,34 @@


@pytest.mark.parametrize(
"version_blob_exists, version_blob_etag, latest_blob_etag",
"version_blob_md5_hash, latest_blob_md5_hash, local_file_md5_hash",
[
pytest.param(False, "same_etag", "different_etag", id="Version blob does not exists: Version and latest blob should be uploaded."),
pytest.param(None, "same_md5_hash", "same_md5_hash", id="Version blob does not exist: Version blob should be uploaded."),
pytest.param("same_md5_hash", None, "same_md5_hash", id="Latest blob does not exist: Latest blob should be uploaded."),
pytest.param(None, None, "same_md5_hash", id="Latest blob and Version blob does not exist: both should be uploaded."),
pytest.param(
False,
"same_etag",
"same_etag",
id="Version blob does not exists but etags are equal: Version blob should be uploaded but latest should not.",
"different_md5_hash", "same_md5_hash", "same_md5_hash", id="Version blob does not match: Version blob should be uploaded."
),
pytest.param(True, "same_etag", "same_etag", id="Version exists and etags are equal: no upload should happen."),
pytest.param(
True, "same_etag", "different_etag", id="Version exists but latest etag is different: latest blob should be uploaded."
"same_md5_hash",
"same_md5_hash",
"same_md5_hash",
id="Version blob and Latest blob match: no upload should happen.",
),
pytest.param(
"same_md5_hash", "different_md5_hash", "same_md5_hash", id="Latest blob does not match: Latest blob should be uploaded."
),
pytest.param(
"same_md5_hash",
"same_md5_hash",
"different_md5_hash",
id="Latest blob and Version blob does not match: both should be uploaded.",
),
],
)
def test_upload_metadata_to_gcs_valid_metadata(mocker, valid_metadata_yaml_files, version_blob_exists, version_blob_etag, latest_blob_etag):
def test_upload_metadata_to_gcs_valid_metadata(
mocker, valid_metadata_yaml_files, version_blob_md5_hash, latest_blob_md5_hash, local_file_md5_hash
):
metadata_file_path = pathlib.Path(valid_metadata_yaml_files[0])
metadata = ConnectorMetadataDefinitionV0.parse_obj(yaml.safe_load(metadata_file_path.read_text()))
expected_version_key = f"metadata/{metadata.data.dockerRepository}/{metadata.data.dockerImageTag}/{METADATA_FILE_NAME}"
Expand All @@ -36,14 +48,18 @@ def test_upload_metadata_to_gcs_valid_metadata(mocker, valid_metadata_yaml_files
mock_credentials = mocker.Mock()
mock_storage_client = mocker.Mock()

mock_version_blob = mocker.Mock(exists=mocker.Mock(return_value=version_blob_exists), etag=version_blob_etag)
mock_latest_blob = mocker.Mock(etag=latest_blob_etag)
latest_blob_exists = latest_blob_md5_hash is not None
version_blob_exists = version_blob_md5_hash is not None

mock_version_blob = mocker.Mock(exists=mocker.Mock(return_value=version_blob_exists), md5_hash=version_blob_md5_hash)
mock_latest_blob = mocker.Mock(exists=mocker.Mock(return_value=latest_blob_exists), md5_hash=latest_blob_md5_hash)
mock_bucket = mock_storage_client.bucket.return_value
mock_bucket.blob.side_effect = [mock_version_blob, mock_latest_blob]

mocker.patch.object(gcs_upload.service_account.Credentials, "from_service_account_file", mocker.Mock(return_value=mock_credentials))
mocker.patch.object(gcs_upload.storage, "Client", mocker.Mock(return_value=mock_storage_client))
mocker.patch.object(gcs_upload, "validate_metadata_images_in_dockerhub", mocker.Mock(return_value=(True, None)))
mocker.patch.object(gcs_upload, "compute_gcs_md5", mocker.Mock(return_value=local_file_md5_hash))

# Call function under tests

Expand All @@ -63,12 +79,18 @@ def test_upload_metadata_to_gcs_valid_metadata(mocker, valid_metadata_yaml_files
if not version_blob_exists:
mock_version_blob.upload_from_filename.assert_called_with(str(metadata_file_path))
assert uploaded
else:
mock_version_blob.upload_from_filename.assert_not_called()
assert not uploaded

if version_blob_etag != latest_blob_etag:
if not latest_blob_exists:
mock_latest_blob.upload_from_filename.assert_called_with(str(metadata_file_path))
assert uploaded

if version_blob_md5_hash != local_file_md5_hash:
mock_version_blob.upload_from_filename.assert_called_with(str(metadata_file_path))
assert uploaded

if latest_blob_md5_hash != local_file_md5_hash:
mock_latest_blob.upload_from_filename.assert_called_with(str(metadata_file_path))
assert uploaded


def test_upload_metadata_to_gcs_invalid_metadata(invalid_metadata_yaml_files):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,35 +234,23 @@ def persist_metadata_definitions(context: OpExecutionContext, overrode_metadata_


@asset(group_name=GROUP_NAME)
def cloud_registry_diff(cloud_registry_from_metadata: ConnectorRegistryV0, legacy_cloud_registry: ConnectorRegistryV0) -> dict:
def cloud_registry_diff(latest_cloud_registry: ConnectorRegistryV0, legacy_cloud_registry: ConnectorRegistryV0) -> str:
"""
Compares the cloud registry from the metadata with the latest OSS registry.
"""
cloud_registry_from_metadata_dict = json.loads(cloud_registry_from_metadata.json())
latest_cloud_registry_dict = json.loads(latest_cloud_registry.json())
legacy_cloud_registry_dict = json.loads(legacy_cloud_registry.json())
return diff_registries(legacy_cloud_registry_dict, cloud_registry_from_metadata_dict).to_dict()
return diff_registries(legacy_cloud_registry_dict, latest_cloud_registry_dict).to_json()


@asset(group_name=GROUP_NAME)
def oss_registry_diff(oss_registry_from_metadata: ConnectorRegistryV0, legacy_oss_registry: ConnectorRegistryV0) -> dict:
def oss_registry_diff(latest_oss_registry: ConnectorRegistryV0, legacy_oss_registry: ConnectorRegistryV0) -> str:
"""
Compares the OSS registry from the metadata with the latest OSS registry.
"""
oss_registry_from_metadata_dict = json.loads(oss_registry_from_metadata.json())
latest_oss_registry_dict = json.loads(latest_oss_registry.json())
legacy_oss_registry_dict = json.loads(legacy_oss_registry.json())
return diff_registries(legacy_oss_registry_dict, oss_registry_from_metadata_dict).to_dict()


@asset(group_name=GROUP_NAME)
def cloud_registry_diff_dataframe(cloud_registry_diff: dict) -> OutputDataFrame:
diff_df = pd.DataFrame.from_dict(cloud_registry_diff)
return output_dataframe(diff_df)


@asset(group_name=GROUP_NAME)
def oss_registry_diff_dataframe(oss_registry_diff: dict) -> OutputDataFrame:
diff_df = pd.DataFrame.from_dict(oss_registry_diff)
return output_dataframe(diff_df)
return diff_registries(legacy_oss_registry_dict, latest_oss_registry_dict).to_json()


@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME)
Expand All @@ -275,28 +263,24 @@ def metadata_directory_report(context: OpExecutionContext):


@asset(required_resource_keys={"registry_report_directory_manager"}, group_name=GROUP_NAME)
def oss_registry_diff_report(context: OpExecutionContext, oss_registry_diff_dataframe: pd.DataFrame):
markdown = oss_registry_diff_dataframe.to_markdown()

def oss_registry_diff_report(context: OpExecutionContext, oss_registry_diff: str):
registry_report_directory_manager = context.resources.registry_report_directory_manager
file_handle = registry_report_directory_manager.write_data(markdown.encode(), ext="md", key="dev/oss_registry_diff_report")
file_handle = registry_report_directory_manager.write_data(oss_registry_diff.encode(), ext="json", key="dev/oss_registry_diff_report")

metadata = {
"preview": MetadataValue.md(markdown),
"gcs_path": MetadataValue.url(file_handle.gcs_path),
"link": MetadataValue.url(file_handle.public_url),
}
return Output(metadata=metadata, value=file_handle)


@asset(required_resource_keys={"registry_report_directory_manager"}, group_name=GROUP_NAME)
def cloud_registry_diff_report(context: OpExecutionContext, cloud_registry_diff_dataframe: pd.DataFrame):
markdown = cloud_registry_diff_dataframe.to_markdown()

def cloud_registry_diff_report(context: OpExecutionContext, cloud_registry_diff: str):
registry_report_directory_manager = context.resources.registry_report_directory_manager
file_handle = registry_report_directory_manager.write_data(markdown.encode(), ext="md", key="dev/cloud_registry_diff_report")
file_handle = registry_report_directory_manager.write_data(
cloud_registry_diff.encode(), ext="json", key="dev/cloud_registry_diff_report"
)

metadata = {
"preview": MetadataValue.md(markdown),
"gcs_path": MetadataValue.url(file_handle.gcs_path),
"link": MetadataValue.url(file_handle.public_url),
}
return Output(metadata=metadata, value=file_handle)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import copy
import json
from typing import List
from pydash.objects import get

import pandas as pd
from dagster import asset, OpExecutionContext, MetadataValue, Output
Expand Down Expand Up @@ -103,7 +105,7 @@ def metadata_to_registry_entry(metadata_definition: dict, connector_type: str, o


def is_metadata_registry_enabled(metadata_definition: dict, registry_name: str) -> bool:
return metadata_definition["data"]["registries"][registry_name]["enabled"]
return get(metadata_definition, f"data.registries.{registry_name}.enabled", False)


def is_metadata_connector_type(metadata_definition: dict, connector_type: str) -> bool:
Expand Down Expand Up @@ -173,7 +175,8 @@ def persist_registry_to_json(
OutputDataFrame: The registry directory manager.
"""
registry_file_name = f"{registry_name}_registry"
registry_json = registry.json()
registry_json = registry.json(exclude_none=True)

file_handle = registry_directory_manager.write_data(registry_json.encode("utf-8"), ext="json", key=registry_file_name)
return file_handle

Expand Down Expand Up @@ -208,7 +211,7 @@ def generate_and_persist_registry(
return Output(metadata=metadata, value=registry_model)


# New Registry
# New Registry Generation


@asset(required_resource_keys={"registry_directory_manager"}, group_name=GROUP_NAME)
Expand Down Expand Up @@ -273,3 +276,32 @@ def oss_destinations_dataframe(oss_registry_from_metadata: ConnectorRegistryV0)
oss_registry_from_metadata_dict = to_json_sanitized_dict(oss_registry_from_metadata)
destinations = oss_registry_from_metadata_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))


# Registry from JSON


@asset(required_resource_keys={"latest_cloud_registry_gcs_blob"}, group_name=GROUP_NAME)
def latest_cloud_registry(latest_cloud_registry_dict: dict) -> ConnectorRegistryV0:
return ConnectorRegistryV0.parse_obj(latest_cloud_registry_dict)


@asset(required_resource_keys={"latest_oss_registry_gcs_blob"}, group_name=GROUP_NAME)
def latest_oss_registry(latest_oss_registry_dict: dict) -> ConnectorRegistryV0:
return ConnectorRegistryV0.parse_obj(latest_oss_registry_dict)


@asset(required_resource_keys={"latest_cloud_registry_gcs_blob"}, group_name=GROUP_NAME)
def latest_cloud_registry_dict(context: OpExecutionContext) -> dict:
oss_registry_file = context.resources.latest_cloud_registry_gcs_blob
json_string = oss_registry_file.download_as_string().decode("utf-8")
oss_registry_dict = json.loads(json_string)
return oss_registry_dict


@asset(required_resource_keys={"latest_oss_registry_gcs_blob"}, group_name=GROUP_NAME)
def latest_oss_registry_dict(context: OpExecutionContext) -> dict:
oss_registry_file = context.resources.latest_oss_registry_gcs_blob
json_string = oss_registry_file.download_as_string().decode("utf-8")
oss_registry_dict = json.loads(json_string)
return oss_registry_dict
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
).upstream()
generate_registry = define_asset_job(name="generate_registry", selection=registries_inclusive)

registry_reports_inclusive = AssetSelection.keys("connector_registry_report").upstream()
registry_reports_inclusive = AssetSelection.keys(
"connector_registry_report", "oss_registry_diff_report", "cloud_registry_diff_report"
).upstream()
generate_registry_reports = define_asset_job(name="generate_registry_reports", selection=registry_reports_inclusive)

metadata_inclusive = AssetSelection.keys("persist_metadata_definitions").upstream()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
data:
allowedHosts:
hosts:
- "*" # Please change to the hostname of the source.
- TODO # Please change to the hostname of the source.
registries:
oss:
enabled: false
connectorSubtype: database
connectorType: destination
definitionId: {{lowerCase definitionId}}
definitionId: {{generateDefinitionId}}
dockerImageTag: 0.1.0
dockerRepository: airbyte/destination-{{dashCase name}}
githubIssueLabel: destination-{{dashCase name}}
icon: {{dashCase name}}.svg
license: MIT
name: {{capitalCase name}}
name: {{capitalCase name}}
releaseDate: TODO
releaseStage: alpha
supportUrl: https://docs.airbyte.com/integrations/destinations/{{dashCase name}}
metadataSpecVersion: "1.0"
Loading

0 comments on commit 7310494

Please sign in to comment.