Skip to content

fix: (CDK) (Manifest) - Add Manifest Normalization module (reduce commonalities + handle schema $refs) #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Apr 25, 2025

Conversation

bazarnov
Copy link
Contributor

@bazarnov bazarnov commented Mar 26, 2025

What

Resolves:

How

Normalization of Declarative Manifest:

  • Added the should_normalize_manifest function to check if the normalization is needed, which is then applied in the ManifestDeclarativeSource class.
  • Introduced a ManifestNormalizer to deduplicate and restructure the manifest, extracting commonly used definitions into a definitions.linked section.

Declarative Component Schema:

  • Added the linkable property to schema definitions for shared components like url_base and authenticator, as of the current state of the Form-Based UI project (MVP)

Error Handling:

  • Introduced a new ManifestNormalizationException for handling issues during normalization such as circular references.

Unit Tests Enhancements:

  • Added new test cases for validation and normalization of manifests.

These changes collectively enhance the functionality, reliability, and maintainability of the connector builder and declarative manifest handling in the Airbyte Python CDK.

User Impact

  • No impact is expected, this is not a breaking change.
  • the normalization execution is hidden under the normalize_manifest: bool = False by default, to not to have any regressions, before we're ready to use it within the UI (/resolve should be having normalize: bool flag to set the normalization to True)

Related PRs

  1. Current PR
  2. Add Manifest Migrations - fix: (PoC) (do not merge) (CDK) (Manifest) - Migrate manifest fields  #463

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced manifest normalization to reduce redundancy by deduplicating repeated schema definitions in declarative source manifests.
    • Added conditional control to enable or disable manifest normalization based on configuration.
    • Added a new exception type for manifest normalization errors.
  • Documentation

    • Improved descriptions for schema fields to enhance clarity and consistency.
  • Style

    • Reformatted code and test argument lists for better readability and consistency.
  • Tests

    • Added unit tests to verify manifest normalization and deduplication logic.
    • Introduced fixtures for declarative source manifests and their normalized forms.

@lmossman
Copy link
Contributor

lmossman commented Mar 28, 2025

@bazarnov as discussed on our call, here are some examples of input YAML manifest and expected output:

First input
version: 6.41.5

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - pokemon

definitions:
  streams:
    pokemon:
      type: DeclarativeStream
      name: pokemon
      retriever:
        type: SimpleRetriever
        decoder:
          type: JsonDecoder
        requester:
          $ref: "#/definitions/base_requester"
          path: pokemon
          http_method: GET
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/pokemon"
    location:
      type: DeclarativeStream
      name: location
      retriever:
        type: SimpleRetriever
        decoder:
          type: JsonDecoder
        requester:
          $ref: "#/definitions/base_requester"
          path: location
          http_method: GET
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/location"
  base_requester:
    type: HttpRequester
    url_base: https://pokeapi.co/api/v2/

streams:
  - $ref: "#/definitions/streams/pokemon"
  - $ref: "#/definitions/streams/location"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required: []
    properties: {}
    additionalProperties: true

metadata:
  assist: {}
  testedStreams:
    pokemon:
      hasRecords: true
      streamHash: 6f6304126c7d27b92d6f753eaaaea074a205e096
      hasResponse: true
      primaryKeysAreUnique: true
      primaryKeysArePresent: true
      responsesAreSuccessful: true
    location:
      streamHash: null
  autoImportSchema:
    pokemon: false
    location: true

schemas:
  pokemon:
    type: object
    $schema: http://json-schema.org/schema#
    properties:
      url:
        type: string
      name:
        type: string
    additionalProperties: true
  location:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
First output
{
    "manifest": {
        "spec": {
            "type": "Spec",
            "connection_specification": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "required": [],
                "properties": {},
                "additionalProperties": true
            }
        },
        "type": "DeclarativeSource",
        "check": {
            "type": "CheckStream",
            "stream_names": [
                "pokemon"
            ]
        },
        "schemas": {
            "pokemon": {
                "type": "object",
                "$schema": "http://json-schema.org/schema#",
                "properties": {
                    "url": {
                        "type": "string"
                    },
                    "name": {
                        "type": "string"
                    }
                },
                "additionalProperties": true
            },
            "location": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            }
        },
        "streams": [
            {
                "name": "pokemon",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "url_base": {
                          "$ref": "#/definitions/shared/HttpRequester/url_base"
                        }
                        "path": "pokemon",
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/pokemon"
                    }
                }
            },
            {
                "name": "location",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "url_base": {
                          "$ref": "#/definitions/shared/HttpRequester/url_base"
                        }
                        "path": "location",
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/pokemon"
                    }
                }
            }
        ],
        "version": "6.41.5",
        "metadata": {
            "assist": {},
            "testedStreams": {
                "pokemon": {
                    "hasRecords": true,
                    "streamHash": "6f6304126c7d27b92d6f753eaaaea074a205e096",
                    "hasResponse": true,
                    "primaryKeysAreUnique": true,
                    "primaryKeysArePresent": true,
                    "responsesAreSuccessful": true
                },
                "location": {
                    "streamHash": null
                }
            },
            "autoImportSchema": {
                "pokemon": false,
                "location": true
            }
        },
        "definitions": {
          "shared": {
            "HttpRequester": {
              "url_base": "https://pokeapi.co/api/v2/"
            }
          }
        }
    }
}

Second input (in this case there are more occurrences of "someotherapi" than "pokeapi", so "someotherapi" is the one that gets deduped and "pokeapi" is left in a duplicated state)
version: 6.41.5

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - pokemon

streams:
  - type: DeclarativeStream
    name: pokemon
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: pokemon
        url_base: https://pokeapi.co/api/v2/
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/pokemon"
  - type: DeclarativeStream
    name: location
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://pokeapi.co/api/v2/
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/location"
  - type: DeclarativeStream
    name: somethingElse
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://someotherapi.com
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/somethingElse"
  - type: DeclarativeStream
    name: somethingElse2
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://someotherapi.com
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/somethingElse2"
  - type: DeclarativeStream
    name: somethingElse3
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://someotherapi.com
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/somethingElse3"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required: []
    properties: {}
    additionalProperties: true

metadata:
  assist: {}
  testedStreams:
    pokemon:
      hasRecords: true
      streamHash: 6f6304126c7d27b92d6f753eaaaea074a205e096
      hasResponse: true
      primaryKeysAreUnique: true
      primaryKeysArePresent: true
      responsesAreSuccessful: true
    location:
      streamHash: null
    somethingElse:
      streamHash: fc475d6b671a83c102b6f1b51bad18512f22455f
  autoImportSchema:
    pokemon: false
    location: true

schemas:
  pokemon:
    type: object
    $schema: http://json-schema.org/schema#
    properties:
      url:
        type: string
      name:
        type: string
    additionalProperties: true
  location:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  somethingElse:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  somethingElse2:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  somethingElse3:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
Second output
{
    "manifest": {
        "spec": {
            "type": "Spec",
            "connection_specification": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "required": [],
                "properties": {},
                "additionalProperties": true
            }
        },
        "type": "DeclarativeSource",
        "check": {
            "type": "CheckStream",
            "stream_names": [
                "pokemon"
            ]
        },
        "schemas": {
            "pokemon": {
                "type": "object",
                "$schema": "http://json-schema.org/schema#",
                "properties": {
                    "url": {
                        "type": "string"
                    },
                    "name": {
                        "type": "string"
                    }
                },
                "additionalProperties": true
            },
            "location": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "somethingElse": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "somethingElse2": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "somethingElse3": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            }
        },
        "streams": [
            {
                "name": "pokemon",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "pokemon",
                        "type": "HttpRequester",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/pokemon"
                    }
                }
            },
            {
                "name": "location",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "location",
                        "type": "HttpRequester",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/location"
                    }
                }
            },
            {
                "name": "somethingElse",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "location",
                        "type": "HttpRequester",
                        "url_base": {
                            "$ref": "definitions/shared/HttpRequester/url_base"
                        },
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/somethingElse"
                    }
                }
            },
            {
                "name": "somethingElse2",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "location",
                        "type": "HttpRequester",
                        "url_base": {
                            "$ref": "definitions/shared/HttpRequester/url_base"
                        },
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/somethingElse2"
                    }
                }
            },
            {
                "name": "somethingElse3",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "location",
                        "type": "HttpRequester",
                        "url_base": {
                            "$ref": "definitions/shared/HttpRequester/url_base"
                        },
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/somethingElse3"
                    }
                }
            }
        ],
        "version": "6.41.5",
        "metadata": {
            "assist": {},
            "testedStreams": {
                "pokemon": {
                    "hasRecords": true,
                    "streamHash": "6f6304126c7d27b92d6f753eaaaea074a205e096",
                    "hasResponse": true,
                    "primaryKeysAreUnique": true,
                    "primaryKeysArePresent": true,
                    "responsesAreSuccessful": true
                },
                "location": {
                    "streamHash": null
                },
                "somethingElse": {
                    "streamHash": "fc475d6b671a83c102b6f1b51bad18512f22455f"
                }
            },
            "autoImportSchema": {
                "pokemon": false,
                "location": true
            }
        }
        "definitions": {
            "shared": {
                "HttpRequester": {
                    url_base: "https://someotherapi.com"
                }
            }
        }
    }
}

And here is an example of if the shareable fields are already being shared:

Input
version: 6.41.5

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - pokemon

definitions: 
  shared:
    HttpRequester:
      url_base: https://pokeapi.co/api/v2/

streams:
  - type: DeclarativeStream
    name: pokemon
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: pokemon
        url_base:
          $ref: "#/definitions/shared/HttpRequester/url_base"
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/pokemon"
  - type: DeclarativeStream
    name: location
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://someotherapi.com
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/location"
  - type: DeclarativeStream
    name: somethingElse
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://someotherapi.com
        http_method: GET
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/somethingElse"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required: []
    properties: {}
    additionalProperties: true

metadata:
  assist: {}
  testedStreams:
    pokemon:
      hasRecords: true
      streamHash: 6f6304126c7d27b92d6f753eaaaea074a205e096
      hasResponse: true
      primaryKeysAreUnique: true
      primaryKeysArePresent: true
      responsesAreSuccessful: true
    location:
      streamHash: null
    somethingElse:
      streamHash: fc475d6b671a83c102b6f1b51bad18512f22455f
  autoImportSchema:
    pokemon: false
    location: true

schemas:
  pokemon:
    type: object
    $schema: http://json-schema.org/schema#
    properties:
      url:
        type: string
      name:
        type: string
    additionalProperties: true
  location:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  somethingElse:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true

You can see in the output, we don't try to deduplicate that field in that case, since it is already pointing to shared/HttpRequester/url_base in one of the streams:

Output
{
    "manifest": {
        "spec": {
            "type": "Spec",
            "connection_specification": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "required": [],
                "properties": {},
                "additionalProperties": true
            }
        },
        "type": "DeclarativeSource",
        "check": {
            "type": "CheckStream",
            "stream_names": [
                "pokemon"
            ]
        },
        "schemas": {
            "pokemon": {
                "type": "object",
                "$schema": "http://json-schema.org/schema#",
                "properties": {
                    "url": {
                        "type": "string"
                    },
                    "name": {
                        "type": "string"
                    }
                },
                "additionalProperties": true
            },
            "location": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "somethingElse": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            }
        },
        "streams": [
            {
                "name": "pokemon",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "pokemon",
                        "type": "HttpRequester",
                        "url_base": {
                          "$ref": "#/definitions/shared/HttpRequester/url_base"
                        }
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/pokemon"
                    }
                }
            },
            {
                "name": "location",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "location",
                        "type": "HttpRequester",
                        "url_base": "https://someotherapi.com",
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/location"
                    }
                }
            },
            {
                "name": "somethingElse",
                "type": "DeclarativeStream",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "path": "location",
                        "type": "HttpRequester",
                        "url_base": "https://someotherapi.com",
                        "http_method": "GET"
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "$ref": "#/schemas/somethingElse"
                    }
                }
            }
        ],
        "version": "6.41.5",
        "metadata": {
            "assist": {},
            "testedStreams": {
                "pokemon": {
                    "hasRecords": true,
                    "streamHash": "6f6304126c7d27b92d6f753eaaaea074a205e096",
                    "hasResponse": true,
                    "primaryKeysAreUnique": true,
                    "primaryKeysArePresent": true,
                    "responsesAreSuccessful": true
                },
                "location": {
                    "streamHash": null
                },
                "somethingElse": {
                    "streamHash": "fc475d6b671a83c102b6f1b51bad18512f22455f"
                }
            },
            "autoImportSchema": {
                "pokemon": false,
                "location": true
            }
        },
        "definitions": {
            "shared": {
                "HttpRequester": {
                    "url_base": "https://pokeapi.co/api/v2/"
                }
            }
        }
    }
}

@bazarnov
Copy link
Contributor Author

bazarnov commented Mar 31, 2025

@lmossman The PR has been updated according to this comment.

IMPORTANT:

  • this code enables the Manifest Optimization ( deduplication + schemas referencing) for the given manifest only for Connector Builder.

@bazarnov bazarnov marked this pull request as ready for review April 17, 2025 13:57
Copy link
Contributor

coderabbitai bot commented Apr 17, 2025

📝 Walkthrough

Walkthrough

This update introduces manifest normalization to the declarative source pipeline. A new ManifestNormalizer class is added to deduplicate and centralize repeated schema components in manifests, replacing them with references. The normalization process is conditionally enabled based on a configuration flag, which is checked via a new helper function and passed to the ManifestDeclarativeSource constructor. Additional changes include schema metadata enhancements, improved code formatting, new exception handling for normalization errors, and comprehensive unit tests to verify normalization behavior. Several type annotations and docstrings are also refined for clarity and consistency.

Changes

File(s) Summary
airbyte_cdk/connector_builder/connector_builder_handler.py Adds should_normalize_manifest function to check config flag; updates create_source to pass normalization flag to ManifestDeclarativeSource.
airbyte_cdk/sources/declarative/manifest_declarative_source.py, airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py, airbyte_cdk/sources/declarative/parsers/custom_exceptions.py Adds manifest normalization logic via new ManifestNormalizer class, integrates normalization into ManifestDeclarativeSource (with new normalize_manifest parameter), centralizes schema loading, and introduces a custom exception for normalization errors.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml, airbyte_cdk/sources/declarative/models/declarative_component_schema.py Updates schema and model descriptions for clarity; marks certain properties as linkable to support normalization.
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py, airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py Refines type annotations in method signatures for improved specificity.
unit_tests/sources/declarative/parsers/conftest.py Adds pytest fixtures for declarative source manifests and their normalized forms to support new unit tests.
unit_tests/sources/declarative/parsers/test_manifest_normalizer.py Adds new unit tests to verify manifest normalization and deduplication logic.
unit_tests/connector_builder/test_connector_builder_handler.py, unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py Refactors argument formatting and dictionary style for consistency and readability in tests.
unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml Adds a new declarative source manifest YAML file used as a test fixture for normalization tests.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Handler as connector_builder_handler.py
    participant Source as ManifestDeclarativeSource
    participant Normalizer as ManifestNormalizer

    User->>Handler: Provide config (may include __should_normalize)
    Handler->>Handler: should_normalize_manifest(config)
    Handler->>Source: create_source(config, normalize_manifest)
    Source->>Source: Load declarative schema
    alt normalize_manifest is True
        Source->>Normalizer: normalize(manifest, schema)
        Normalizer-->>Source: Normalized manifest
    else normalize_manifest is False
        Source->>Source: Use manifest as-is
    end
    Source-->>Handler: Source instance ready
Loading

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • lmossman
  • darynaishchenko

Would you like me to suggest adding any other reviewers or labels, or does this set look just right to you? Wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (7)
airbyte_cdk/sources/declarative/parsers/custom_exceptions.py (1)

24-30: Good addition of a specialized exception for manifest normalization!

This new exception class ManifestNormalizationException will help differentiate normalization errors from other types of exceptions, making error handling more precise. The docstring mentions "circular reference" though - should it be more specific to deduplication failures instead? Wdyt?

-    """
-    Raised when a circular reference is detected in a manifest.
-    """
+    """
+    Raised when manifest normalization fails, such as during deduplication.
+    """
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2187-2195: Minor grammar tweak on the path description?
The updated description reads “The Path the specific API endpoint that this stream represents…”—could we make it “The path of the specific API endpoint that this stream represents” for extra clarity? wdyt?

unit_tests/sources/declarative/parsers/conftest.py (1)

10-173: Fixture duplication & maintenance burden – consider trimming test data?

The fixture manifest_with_multiple_url_base() embeds five almost‑identical stream definitions differing only by name/path and the requester reference. That makes the file bulky and hard to scan.
Would it be lighter to generate these streams programmatically inside the fixture (e.g. via a small helper) so future changes touch fewer lines, wdyt?

airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

123-132: Fail‑open fallback may mask bugs

If ManifestNormalizer raises, the code silently keeps the un‑normalized manifest. Could we at least log the exception at DEBUG so issues do not go unnoticed, wdyt?

unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (1)

6-14: Tests depend on a “private” helper

Importing _get_declarative_component_schema() ties the test to a non‑public API.
Would exposing a tiny public helper (or loading the YAML inside the test) make the contract clearer and avoid name‑mangling underscore imports, wdyt?

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (2)

72-80: Typos in class docstring & method name

Small polish point: appliying, normilize() and the method _deduplicate_minifest look misspelled.
Renaming the method to _deduplicate_manifest (and updating the call site) plus fixing the docstring would aid readability, wdyt?


361-363: Hashing via json.dumps – any concern about ordering of lists?

json.dumps(sort_keys=True) guarantees key order for dicts but not element order for lists, so two logically identical lists with different ordering won’t be deduped.
Is that acceptable for your use‑case, or should we normalise lists (e.g., sort when elements are primitives) before hashing, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24cbc51 and c403a0e.

📒 Files selected for processing (12)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (1 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (12 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/custom_exceptions.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py (2 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (3 hunks)
  • unit_tests/sources/declarative/parsers/conftest.py (1 hunks)
  • unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (1 hunks)
  • unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Check: 'source-shopify' (skip=false)
🔇 Additional comments (15)
unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py (1)

154-157: Nice code formatting improvement!

This change improves readability by breaking the dictionary definition across multiple lines instead of having it on a single line. It makes the structure easier to scan visually while maintaining the same functionality.

unit_tests/connector_builder/test_connector_builder_handler.py (3)

500-506: Good formatting improvement!

The arguments to handle_request are now more readable, with each argument on its own line with proper indentation. This is consistent with Python coding best practices for readability.


518-524: Consistent formatting applied here too, looks good!

This matches the formatting pattern applied to the other handle_request call, maintaining consistency throughout the codebase.


928-934: Consistent formatting applied here as well!

The same multi-line argument formatting is applied to this handle_request call, completing the consistent pattern across the file.

airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py (2)

6-6: Good type import addition!

Adding the Dict import from typing to support the updated return type annotation.


102-102: More precise return type annotation!

Changed the return type from Mapping[str, Any] to Dict[str, Any], which is more specific and accurately reflects what the method actually returns. This provides better type information for static analysis tools and developers.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2177-2186: Align url_base description with schema YAML
Great job adding the definite article to the url_base description so it now matches the declarative schema in the YAML—this consistency will help both users and the normalizer pick up the right metadata. wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)

1872-1874: Added linkable property to url_base - looks great!

The addition of linkable: true here is a key part of the manifest normalization feature, marking the URL base as a candidate for deduplication and reference-based sharing across streams. The title and description improvements also enhance clarity - nice touch adding "The" for more consistent documentation style, wdyt?


1891-1893: Improved description formatting

Similar style improvement adding "The" to the path description for consistency. Good attention to detail in maintaining a consistent documentation style throughout the schema.


1908-1909: Added linkable property to authenticator - great!

Adding the linkable: true attribute to the authenticator property follows the same pattern as for url_base, allowing the new normalization module to identify and deduplicate shared authenticator configurations. This will help reduce duplication in manifests where multiple streams use the same authentication method.

airbyte_cdk/connector_builder/connector_builder_handler.py (2)

59-66: Well-implemented check for manifest normalization flag

This new function cleanly encapsulates the logic for determining whether manifest normalization should be applied. The docstring is clear and descriptive, and the implementation follows good practices by providing a default value (False) if the flag is not present.


74-74: Correctly integrated normalization flag with source creation

The function call to should_normalize_manifest(config) is appropriately placed to pass the normalization flag to the ManifestDeclarativeSource constructor. This integrates the new normalization feature while maintaining backward compatibility by defaulting to False.

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (2)

7-7: Improved import organization

The import statement now includes Dict explicitly, which is needed for the return type annotation update below. Good practice to include all required types in the imports.


98-98: More specific return type annotation

Changing the return type from Mapping[str, Any] to Dict[str, Any] provides more specificity about the actual return value. This matches what the method actually returns (a dict instance), making the type hints more accurate. This kind of precision in type annotations improves type checking and IDE support.

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (1)

146-164: _prepare_definitions deletes every non‑linked entry

Purging definitions.* except linked works for the current UI flow, but could silently drop user‑defined reusable objects (e.g. auth configs) that are still referenced elsewhere.
Should we restrict the pruning to keys we know are safe to remove (e.g. streams, requester_*) or at least gate it behind an explicit flag, wdyt?

@lmossman
Copy link
Contributor

@bazarnov I tried testing this locally by running my local airbyte-platform-internal repo on my FE PR's branch, pointing to this PR's CDK branch.

When I opened the Builder in YAML mode and put in this manifest:

manifest.yaml
version: 6.44.0

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - pokemon

definitions:
  streams:
    berries:
      type: DeclarativeStream
      name: berries
      retriever:
        type: SimpleRetriever
        decoder:
          type: JsonDecoder
        requester:
          $ref: "#/definitions/base_requester"
          path: berries
          http_method: GET
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/berries"
    pokemon:
      type: DeclarativeStream
      name: pokemon
      retriever:
        type: SimpleRetriever
        decoder:
          type: JsonDecoder
        requester:
          $ref: "#/definitions/base_requester"
          path: pokemon
          http_method: GET
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/pokemon"
    location:
      type: DeclarativeStream
      name: location
      retriever:
        type: SimpleRetriever
        decoder:
          type: JsonDecoder
        requester:
          $ref: "#/definitions/base_requester"
          path: location
          http_method: GET
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: "#/schemas/location"
  base_requester:
    type: HttpRequester
    url_base: https://pokeapi.co/api/v2/
    authenticator:
      type: ApiKeyAuthenticator
      api_token: "{{ config[\"api_key\"] }}"
      inject_into:
        type: RequestOption
        field_name: API_KEY
        inject_into: header

streams:
  - $ref: "#/definitions/streams/pokemon"
  - $ref: "#/definitions/streams/location"
  - $ref: "#/definitions/streams/berries"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required:
      - api_key
    properties:
      api_key:
        type: string
        order: 0
        title: API Key
        airbyte_secret: true
    additionalProperties: true

metadata:
  assist: {}
  testedStreams:
    berries:
      streamHash: null
    pokemon:
      streamHash: null
    location:
      streamHash: null
  autoImportSchema:
    berries: true
    pokemon: true
    location: true

schemas:
  berries:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  pokemon:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  location:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true

Then I got back this error from the resolve call:

Traceback (most recent call last):
  File "/Users/lakemossman/code/airbyte-python-cdk/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 330, in _validate_source
    validate(self._source_config, self._declarative_component_schema)
  File "/Users/lakemossman/code/airbyte-python-cdk/.venv/lib/python3.10/site-packages/jsonschema/validators.py", line 1121, in validate
    raise error
jsonschema.exceptions.ValidationError: {'$ref': '#/definitions/linked/HttpRequester/authenticator'} is not valid under any of the given schemas

Failed validating 'anyOf' in schema[1]['properties']['authenticator']:
    {'anyOf': [{'$ref': '#/definitions/ApiKeyAuthenticator'},
               {'$ref': '#/definitions/BasicHttpAuthenticator'},
               {'$ref': '#/definitions/BearerAuthenticator'},
               {'$ref': '#/definitions/CustomAuthenticator'},
               {'$ref': '#/definitions/OAuthAuthenticator'},
               {'$ref': '#/definitions/JwtAuthenticator'},
               {'$ref': '#/definitions/NoAuth'},
               {'$ref': '#/definitions/SessionTokenAuthenticator'},
               {'$ref': '#/definitions/LegacySessionTokenAuthenticator'},
               {'$ref': '#/definitions/SelectiveAuthenticator'}],
     'description': 'Authentication method to use for requests sent to the '
                    'API.',
     'linkable': True,
     'title': 'Authenticator'}

On instance['authenticator']:
    {'$ref': '#/definitions/linked/HttpRequester/authenticator'}

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/lakemossman/code/airbyte-python-cdk/airbyte_cdk/connector_builder/main.py", line 104, in <module>
    print(handle_request(sys.argv[1:]))
  File "/Users/lakemossman/code/airbyte-python-cdk/airbyte_cdk/connector_builder/main.py", line 94, in handle_request
    source = create_source(config, limits)
  File "/Users/lakemossman/code/airbyte-python-cdk/airbyte_cdk/connector_builder/connector_builder_handler.py", line 70, in create_source
    return ManifestDeclarativeSource(
  File "/Users/lakemossman/code/airbyte-python-cdk/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 150, in __init__
    self._validate_source()
  File "/Users/lakemossman/code/airbyte-python-cdk/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 332, in _validate_source
    raise ValidationError(
jsonschema.exceptions.ValidationError: Validation against json schema defined in declarative_component_schema.yaml schema failed

So it seems like the CDK is trying to validate the manifest again after you have deduplicated the linkable fields, which causes that validation to fail since the manifest contains $refs at that point.

I think the behavior I expected is that the /resolve endpoint would do the following operations in this order:

  1. Preprocess the manifest as normal (resolve $refs, set default types, resolve $parameters, etc)
  2. Validate the preprocessed manifest against the declarative_component_schema.yaml
  3. Then apply the normalization/deduplication logic to produce the linked definitions and $refs pointing to them
  4. Return that result

With this approach, we shouldn't ever try to validate the result of deduplicating the manifest against the json schema, as we will always expect that to fail.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

65-76: Reuse the parsed schema via lru_cache (same suggestion as last time 👍)

The helper reloads & parses the YAML on every ManifestDeclarativeSource instantiation. Given that the schema is immutable at runtime, would you like to decorate _get_declarative_component_schema() with @functools.lru_cache(maxsize=1) (and import functools) so the IO happens only once, spreading the cost across all sources, wdyt?

While you are touching the helper, you could also switch from the more verbose yaml.load(..., Loader=yaml.SafeLoader) to the dedicated yaml.safe_load(...) helper which conveys the intent more clearly.

Example patch:

-from copy import deepcopy
+from copy import deepcopy
+from functools import lru_cache
 ...
-import yaml
+import yaml
 ...
-def _get_declarative_component_schema() -> Dict[str, Any]:
+@lru_cache(maxsize=1)
+def _get_declarative_component_schema() -> Dict[str, Any]:
 ...
-            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
+            declarative_component_schema = yaml.safe_load(raw_component_schema)
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)

94-108: Parameter typing – bool rather than Optional[bool]?

normalize_manifest has a default value of False, which means the argument can never be None in practice. Would it be clearer (and help static analysers) to type‑hint it simply as bool = False and drop the Optional wrapper in both the signature and the attribute definition, wdyt?

-        normalize_manifest: Optional[bool] = False,
+        normalize_manifest: bool = False,
 ...
-        self._should_normalize = normalize_manifest
+        self._should_normalize: bool = normalize_manifest

148-161: Docstring says “Returns None” but the function returns a manifest

_preprocess_manifest() returns propagated_manifest, yet the docstring claims it returns None. Updating the docstring (and possibly the return‑type hint) would avoid confusion for future readers. Something like:

-        Returns:
-            None
+        Returns:
+            Dict[str, Any]: the fully resolved and propagated manifest

wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 38f7da6 and 7d71f4b.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)

@lmossman
Copy link
Contributor

@bazarnov that original issue from my comment above is now fixed.

However, I have encountered another issue here, with this manifest put into YAML mode of Builder UI pointed at this branch:

manifest.yaml
version: 6.44.0

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - pokemon

definitions:
  linked:
    HttpRequester:
      url_base: https://pokeapi.co/api/v1/

streams:
  - type: DeclarativeStream
    name: pokemon
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: pokemon
        url_base:
          $ref: "#/definitions/linked/HttpRequester/url_base"
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: "{{ config[\"api_key\"] }}"
          inject_into:
            type: RequestOption
            field_name: API_KEY
            inject_into: header
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/pokemon"
  - type: DeclarativeStream
    name: trainers
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: pokemon
        url_base:
          $ref: "#/definitions/linked/HttpRequester/url_base"
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: "{{ config[\"api_key\"] }}"
          inject_into:
            type: RequestOption
            field_name: API_KEY
            inject_into: header
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/pokemon"
  - type: DeclarativeStream
    name: items
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: pokemon
        url_base: https://pokeapi.co/api/v2/
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: "{{ config[\"api_key\"] }}"
          inject_into:
            type: RequestOption
            field_name: API_KEY
            inject_into: header
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/pokemon"
  - type: DeclarativeStream
    name: location
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: location
        url_base: https://pokeapi.co/api/v2/
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: "{{ config[\"api_key\"] }}"
          inject_into:
            type: RequestOption
            field_name: API_KEY
            inject_into: header
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/location"
  - type: DeclarativeStream
    name: berries
    retriever:
      type: SimpleRetriever
      decoder:
        type: JsonDecoder
      requester:
        type: HttpRequester
        path: berries
        url_base: https://pokeapi.co/api/v2/
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: "{{ config[\"api_key\"] }}"
          inject_into:
            type: RequestOption
            field_name: API_KEY
            inject_into: header
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: []
    schema_loader:
      type: InlineSchemaLoader
      schema:
        $ref: "#/schemas/berries"

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required:
      - api_key
    properties:
      api_key:
        type: string
        order: 0
        title: API Key
        airbyte_secret: true
    additionalProperties: true

metadata:
  assist: {}
  testedStreams:
    berries:
      streamHash: null
    pokemon:
      streamHash: null
    location:
      streamHash: null
    trainers:
      streamHash: ca4ee51a2aaa2a53b9c0b91881a84ad621da575f
    items:
      streamHash: 12e624ecf47c6357c74c27d6a65c72e437b1534a
  autoImportSchema:
    berries: true
    pokemon: true
    location: true

schemas:
  berries:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  pokemon:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  location:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true

Notice how in this manifest:

  • The first 2 streams are already pointing to the linked/HttpRequester/url_base, which has the url ending in /v1/
  • The other 3 streams don't have $refs for url_base, but have the same url_base value ending in /v2/
  • All 5 streams have the same authenticator value but are not using $refs for them.

For this manifest, I would expect this branch's /resolve endpoint to return this response:

Expected response
{
    "manifest": {
        "version": "6.44.0",
        "type": "DeclarativeSource",
        "check": {
            "type": "CheckStream",
            "stream_names": [
                "pokemon"
            ]
        },
        "definitions": {
            "linked": {
                "HttpRequester": {
                    "url_base": "https://pokeapi.co/api/v1/",
                    "authenticator": {
                        "type": "ApiKeyAuthenticator",
                        "api_token": "{{ config[\"api_key\"] }}",
                        "inject_into": {
                            "type": "RequestOption",
                            "field_name": "API_KEY",
                            "inject_into": "header"
                        }
                    }
                }
            }
        },
        "streams": [
            {
                "type": "DeclarativeStream",
                "name": "pokemon",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "pokemon",
                        "url_base": {
                            "$ref": "#/definitions/linked/url_base"
                        }
                        "http_method": "GET",
                        "authenticator": {
                            "$ref": "#/definitions/linked/HttpRequester/authenticator"
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "trainers",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "pokemon",
                        "url_base": {
                            "$ref": "#/definitions/linked/url_base"
                        }
                        "http_method": "GET",
                        "authenticator": {
                            "$ref": "#/definitions/linked/HttpRequester/authenticator"
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "items",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "pokemon",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET",
                        "authenticator": {
                            "$ref": "#/definitions/linked/HttpRequester/authenticator"
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "location",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "location",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET",
                        "authenticator": {
                            "$ref": "#/definitions/linked/HttpRequester/authenticator"
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "berries",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "berries",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET",
                        "authenticator": {
                            "$ref": "#/definitions/linked/HttpRequester/authenticator"
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            }
        ],
        "spec": {
            "type": "Spec",
            "connection_specification": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "required": [
                    "api_key"
                ],
                "properties": {
                    "api_key": {
                        "type": "string",
                        "order": 0,
                        "title": "API Key",
                        "airbyte_secret": true
                    }
                },
                "additionalProperties": true
            }
        },
        "metadata": {
            "assist": {},
            "testedStreams": {
                "berries": {
                    "streamHash": null
                },
                "pokemon": {
                    "streamHash": null
                },
                "location": {
                    "streamHash": null
                },
                "trainers": {
                    "streamHash": "ca4ee51a2aaa2a53b9c0b91881a84ad621da575f"
                },
                "items": {
                    "streamHash": "12e624ecf47c6357c74c27d6a65c72e437b1534a"
                }
            },
            "autoImportSchema": {
                "berries": true,
                "pokemon": true,
                "location": true
            }
        },
        "schemas": {
            "berries": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "pokemon": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "location": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            }
        }
    }
}

where:

  • The first 2 streams are pointing to the linked url_base with a $ref as they started out
  • The other 3 streams don't use a $ref for their url_base value
    (even though they had the most common value, the first two streams were already pointing to linked so that's what should remain linked for this field specifically)
  • All 5 streams are pointing to a linked authenticator since they all had the same value

But what I actually received was this:

Actual response
{
    "manifest": {
        "version": "6.44.0",
        "type": "DeclarativeSource",
        "check": {
            "type": "CheckStream",
            "stream_names": [
                "pokemon"
            ]
        },
        "definitions": {
            "linked": {
                "HttpRequester": {
                    "url_base": "https://pokeapi.co/api/v1/"
                }
            }
        },
        "streams": [
            {
                "type": "DeclarativeStream",
                "name": "pokemon",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "pokemon",
                        "url_base": "https://pokeapi.co/api/v1/",
                        "http_method": "GET",
                        "authenticator": {
                            "type": "ApiKeyAuthenticator",
                            "api_token": "{{ config[\"api_key\"] }}",
                            "inject_into": {
                                "type": "RequestOption",
                                "field_name": "API_KEY",
                                "inject_into": "header"
                            }
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "trainers",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "pokemon",
                        "url_base": "https://pokeapi.co/api/v1/",
                        "http_method": "GET",
                        "authenticator": {
                            "type": "ApiKeyAuthenticator",
                            "api_token": "{{ config[\"api_key\"] }}",
                            "inject_into": {
                                "type": "RequestOption",
                                "field_name": "API_KEY",
                                "inject_into": "header"
                            }
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "items",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "pokemon",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET",
                        "authenticator": {
                            "type": "ApiKeyAuthenticator",
                            "api_token": "{{ config[\"api_key\"] }}",
                            "inject_into": {
                                "type": "RequestOption",
                                "field_name": "API_KEY",
                                "inject_into": "header"
                            }
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "location",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "location",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET",
                        "authenticator": {
                            "type": "ApiKeyAuthenticator",
                            "api_token": "{{ config[\"api_key\"] }}",
                            "inject_into": {
                                "type": "RequestOption",
                                "field_name": "API_KEY",
                                "inject_into": "header"
                            }
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            },
            {
                "type": "DeclarativeStream",
                "name": "berries",
                "retriever": {
                    "type": "SimpleRetriever",
                    "decoder": {
                        "type": "JsonDecoder"
                    },
                    "requester": {
                        "type": "HttpRequester",
                        "path": "berries",
                        "url_base": "https://pokeapi.co/api/v2/",
                        "http_method": "GET",
                        "authenticator": {
                            "type": "ApiKeyAuthenticator",
                            "api_token": "{{ config[\"api_key\"] }}",
                            "inject_into": {
                                "type": "RequestOption",
                                "field_name": "API_KEY",
                                "inject_into": "header"
                            }
                        }
                    },
                    "record_selector": {
                        "type": "RecordSelector",
                        "extractor": {
                            "type": "DpathExtractor",
                            "field_path": []
                        }
                    }
                },
                "schema_loader": {
                    "type": "InlineSchemaLoader",
                    "schema": {
                        "type": "object",
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": true
                    }
                }
            }
        ],
        "spec": {
            "type": "Spec",
            "connection_specification": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "required": [
                    "api_key"
                ],
                "properties": {
                    "api_key": {
                        "type": "string",
                        "order": 0,
                        "title": "API Key",
                        "airbyte_secret": true
                    }
                },
                "additionalProperties": true
            }
        },
        "metadata": {
            "assist": {},
            "testedStreams": {
                "berries": {
                    "streamHash": null
                },
                "pokemon": {
                    "streamHash": null
                },
                "location": {
                    "streamHash": null
                },
                "trainers": {
                    "streamHash": "ca4ee51a2aaa2a53b9c0b91881a84ad621da575f"
                },
                "items": {
                    "streamHash": "12e624ecf47c6357c74c27d6a65c72e437b1534a"
                }
            },
            "autoImportSchema": {
                "berries": true,
                "pokemon": true,
                "location": true
            }
        },
        "schemas": {
            "berries": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "pokemon": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            },
            "location": {
                "type": "object",
                "$schema": "http://json-schema.org/draft-07/schema#",
                "properties": {},
                "additionalProperties": true
            }
        }
    }
}

These are the things wrong with the actual response:

  • There is correctly the /v1/ url_base in linked, but the first 2 streams aren't pointing to it with a $ref
  • There is no authenticator in linked, and no streams are using $refs for either of these fields

@bazarnov
Copy link
Contributor Author

@lmossman The input manifest is incorrect:

The schemas section is missing the trainers and the items schemas declared, thus it falls back to the original input and skips applying any modifications. The schemas sections should look like this, for the given input manifest:

schemas:
  berries:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  pokemon:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  location:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  trainers:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true
  items:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    properties: {}
    additionalProperties: true

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (2)

312-321: _collect assumes parent objects always have "type"

_get_occurance_samples accesses obj["type"]; if a duplicate primitive sits under an object lacking that field, a KeyError bubbles up and aborts normalization.
Would adding obj.get("type", "Unknown") (or skipping such occurrences) make the process more robust, wdyt?

-        return (
-            obj["type"],
+        return (
+            obj.get("type", "Unknown"),

432-436: 🛠️ Refactor suggestion

Consider error handling for key access in _get_occurance_samples

This function assumes that obj will always have a "type" key, but that might not be the case for all objects. Consider using obj.get("type", "Unknown") to handle cases where the key is missing.

        return (
-            obj["type"],
+            obj.get("type", "Unknown"),
            path[-1],
            value,
        )  # Return the component's name as the last part of its path
🧹 Nitpick comments (8)
airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (5)

111-111: Is there a typo in this method name?

The method name _deduplicate_minifest() appears to contain a typo - I think you meant "manifest" instead of "minifest", wdyt?

-            self._deduplicate_minifest()
+            self._deduplicate_manifest()

134-146: Method name should match its usage

This method definition contains the same typo as its invocation.

-    def _deduplicate_minifest(self) -> None:
+    def _deduplicate_manifest(self) -> None:

117-118: Consider adding actual debug logging

The TODO comment indicates debug logging should be enabled, but no logging implementation exists yet. Consider adding proper debug logging using Python's logging module to help with troubleshooting.

+import logging
+
+# At the beginning of the class
+    def __init__(
+        self,
+        resolved_manifest: ManifestType,
+        declarative_schema: DefinitionsType,
+    ) -> None:
+        self._logger = logging.getLogger("airbyte.sources.declarative.parsers.manifest_normalizer")
+        ...

# In the normalize method
-            # if any error occurs, we just return the original manifest.
-            # TODO: enable debug logging
+            # if any error occurs, we just return the original manifest
+            self._logger.debug("Manifest normalization failed", exc_info=True)

400-418: Parameter in docstring doesn't match method signature

The docstring refers to a definitions parameter that doesn't exist in the method signature. Consider updating the docstring to match the actual method parameters.

        """
        Get the value of a linked definition by its key.

        Args:
            key: The key to check
-            definitions: The definitions dictionary with definitions
+            type_key: The type key in linked definitions

        Returns:
            The value of the linked definition
        """

18-19: Typo in class docstring

There's a small typo in the class docstring - "appliying" should be "applying".

    """
-    This class is responsible for normalizing the manifest by appliying processing such as:
+    This class is responsible for normalizing the manifest by applying processing such as:
     - removing duplicated definitions
     - replacing them with references.
unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml (3)

212-212: Add newline at end of file

Static analysis detected that there's no newline character at the end of the file. It's a standard best practice to end files with a newline character.

      location_type:
        type: string
+
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 212-212: no new line character at the end of file

(new-line-at-end-of-file)


28-34: Could authenticator also be in linked definitions?

I notice that all streams use the same ApiKeyAuthenticator configuration. Have you considered moving this to the linked definitions as well to reduce duplication? The normalizer should handle this, but it might be clearer to define it explicitly if it's intentionally shared.


198-205: Missing additionalProperties in pokemon schema

The pokemon schema doesn't explicitly define whether additional properties are allowed, while the other schemas do (additionalProperties: true). For consistency, would it make sense to add this?

      pokemon_type:
        type: integer
+    additionalProperties: true
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7d71f4b and 304235c.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (1 hunks)
  • unit_tests/sources/declarative/parsers/conftest.py (1 hunks)
  • unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml (1 hunks)
  • unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • unit_tests/sources/declarative/parsers/conftest.py
  • unit_tests/sources/declarative/parsers/test_manifest_normalizer.py
🧰 Additional context used
🪛 YAMLlint (1.35.1)
unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml

[error] 212-212: no new line character at the end of file

(new-line-at-end-of-file)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (3)

89-90: LGTM: Proper initialization of fields

The initialization properly makes a deep copy of the resolved manifest, initializes class variables, and retrieves the linkable tags from the schema. This ensures your class doesn't accidentally modify the input manifests.


110-115: LGTM: Error handling in normalize() method

The normalize method has proper error handling that falls back to the original manifest if any exceptions occur during normalization. This is a good defensive programming practice that ensures the system remains robust even if normalization fails.


317-320: LGTM: Proper handling of nested dictionaries

The recursive traversal in _collect correctly processes nested dictionaries first and then checks for linkable keys. This approach ensures that the entire object structure is evaluated properly.

unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml (2)

10-14: LGTM: Good use of linked definitions

The manifest properly defines linked definitions for the HttpRequester with a shared url_base. This structure will work well with the manifest normalizer to encourage reuse across streams.


17-43: LGTM: Well-structured stream definition

The pokemon stream is well-structured with proper configuration of the retriever, decoder, requester, authenticator, and record selector. The schema is also properly referenced using $ref.

@bazarnov
Copy link
Contributor Author

Schemas are now extracted for each stream and set under the schemas.<stream_name>, which takes off the responsibility of doing the same from the UI perspective, once the /resolve endpoint is called.

@lmossman
Copy link
Contributor

The behavior looks good to me now!

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (4)

16-16: Should this resolver be a fixture instead of a global variable?

The resolver is defined at the module level. Would it make more sense to define it as a pytest fixture to better control its lifecycle and make the tests more isolated? wdyt?


19-33: Test function looks good with clear purpose!

The test is well-structured with descriptive naming and a helpful docstring. It follows a clear pattern of:

  1. Getting the schema
  2. Preprocessing the manifest
  3. Normalizing the manifest
  4. Asserting equality with the expected result

One small suggestion: would adding type hints for the function parameters improve readability? wdyt?

def test_when_multiple_url_base_are_resolved_and_most_frequent_is_shared(
-    manifest_with_multiple_url_base,
-    expected_manifest_with_multiple_url_base_normalized,
+    manifest_with_multiple_url_base: dict,
+    expected_manifest_with_multiple_url_base_normalized: dict,
) -> None:

51-73: The third test covers a complex case but could use more clarity

The test is well-structured and covers an important case. The docstring provides good context but the term "abnormal schemas" might not be immediately clear to readers unfamiliar with the codebase.

Based on previous review comments, I understand this refers to cases where multiple streams reference the same schema. Would adding a brief explanation about this in the docstring be helpful for future maintainers? wdyt?

def test_with_linked_definitions_url_base_authenticator_when_multiple_streams_reference_the_same_schema(
    manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas,
    expected_manifest_with_linked_definitions_url_base_authenticator_normalized,
) -> None:
    """
    This test is to check that the manifest is normalized when the `url_base` and the `authenticator` is linked
    between the definitions and the `url_base` is present in the manifest.
    The `authenticator` is not a normal schema, but a reference to another schema.

    The test also verifies the `stream.schema_loader.schema` is properly extracted to
    the `schemas.<stream_name>`.
+
+    "Abnormal schemas" refers to a case where not all streams have their own unique schema definition.
+    In this test, 5 streams are declared but only 3 schemas are defined (3 streams re-use the same schema,
+    while 2 other streams use 2 different schemas).
    """

1-74: Consider adding a module-level docstring

This file would benefit from a module-level docstring explaining its purpose and relation to the manifest normalization feature. This would help future developers understand the test suite's overall purpose.

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

+"""
+Unit tests for the ManifestNormalizer, which deduplicates and restructures declarative manifests
+by extracting commonly used definitions into a `definitions.linked` section.
+"""

from airbyte_cdk.sources.declarative.manifest_declarative_source import (
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 348aaae and 2c8d164.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (3)
unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (3)

1-14: Imports and setup look good!

The imports are well-organized and the module is properly structured. The copyright header is present and correctly formatted.


35-49: Second test follows good practices!

This test follows the same pattern as the first one with clear naming and documentation. The same type hint suggestion applies here as well.


1-74: Missing tests for error cases

The PR mentions handling circular references and errors during normalization via a new ManifestNormalizationException. Should there be tests for those error cases to ensure the exception is properly raised when circular references are detected? wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

181-192: Should we wrap normalization errors to keep exception surfacing consistent?

This block directly calls ManifestNormalizer.normalize(), allowing a ManifestNormalizationException to bubble up. A previous review suggested catching it and re-raising a ValidationError (or AirbyteTracedException) so callers see a familiar error type. That suggestion still applies here. Would you like to adopt that wrapper for a smoother DX, wdyt?

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)

148-171: Docstring return type & description seem out of sync with implementation

The docstring states that _preprocess_manifest “modifies the input manifest in place” and “Returns: None”, yet the function actually returns a new Dict[str, Any] instance (propagated_manifest). Could we update the docstring (or the implementation) so the contract is crystal-clear, wdyt?


94-108: Minor typing nit — do we need Optional for a boolean flag that defaults to False?

normalize_manifest: Optional[bool] = False effectively narrows to bool because None is indistinguishable from False downstream. Switching to a plain bool keeps the signature simpler (and static checkers slightly happier). Something like:

-        normalize_manifest: Optional[bool] = False,
+        normalize_manifest: bool = False,

No behaviour change, just a little clarity—what do you think?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2c8d164 and 8d7be4e.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (11 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

65-81: Could we memo-ize the schema load to avoid repeated I/O, wdyt?

_get_declarative_component_schema() is executed for every ManifestDeclarativeSource instantiation, which means the YAML file is re-read and parsed each time. Considering the schema is immutable at runtime, wrapping the function with @functools.lru_cache(maxsize=1) (or promoting it to a module-level constant) would give us free caching while keeping the public surface untouched.

+from functools import lru_cache
...
- def _get_declarative_component_schema() -> Dict[str, Any]:
+@lru_cache(maxsize=1)
+def _get_declarative_component_schema() -> Dict[str, Any]:

This micro-optimization removes unnecessary disk I/O and YAML parsing without changing call sites.
[ suggest_optional_refactor ]

@bazarnov bazarnov merged commit 5a1644d into main Apr 25, 2025
27 checks passed
@bazarnov bazarnov deleted the baz/cdk/extract-common-manifest-parts branch April 25, 2025 13:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants