Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte_cdk/manifest_migrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This directory contains the logic and registry for manifest migrations in the Ai
3. **Register the Migration:**
- Open `migrations/registry.yaml`.
- Add an entry under the appropriate version, or create a new version section if needed.
- Version can be: "*", "==6.48.3", "~=1.2", ">=1.0.0,<2.0.0", "6.48.3"
- Version can be: "\*", "==6.48.3", "~=1.2", ">=1.0.0,<2.0.0", "6.48.3"
- Each migration entry should include:
- `name`: The filename (without `.py`)
- `order`: The order in which this migration should be applied for the version
Expand Down
1 change: 0 additions & 1 deletion airbyte_cdk/manifest_server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,3 @@ This requires the `ddtrace` dependency, which is included in the `manifest-serve
# Run with Datadog tracing enabled
DD_ENABLED=true manifest-server start
```

Original file line number Diff line number Diff line change
Expand Up @@ -3746,7 +3746,7 @@ definitions:
properties:
type:
type: string
enum: [ PaginationReset ]
enum: [PaginationReset]
action:
type: string
enum:
Expand All @@ -3763,7 +3763,7 @@ definitions:
properties:
type:
type: string
enum: [ PaginationResetLimits ]
enum: [PaginationResetLimits]
number_of_records:
type: integer
GzipDecoder:
Expand Down
22 changes: 16 additions & 6 deletions airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,35 @@ def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry:
)


def _expand_refs(schema: Any, ref_resolver: Resolver) -> None:
def _expand_refs(schema: Any, ref_resolver: Resolver, visited_refs: set[str] | None = None) -> None:
"""Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive.

:param schema: schema that will be patched
:param ref_resolver: resolver to look up references
:param visited_refs: set of already visited reference URLs to detect circular references
"""
if visited_refs is None:
visited_refs = set()

if isinstance(schema, MutableMapping):
if "$ref" in schema:
ref_url = schema.pop("$ref")

if ref_url in visited_refs:
return

visited_refs.add(ref_url)
definition = ref_resolver.lookup(ref_url).contents
_expand_refs(
definition, ref_resolver=ref_resolver
) # expand refs in definitions as well
_expand_refs(definition, ref_resolver=ref_resolver, visited_refs=visited_refs)
schema.update(definition)
visited_refs.remove(ref_url)
else:
for key, value in schema.items():
_expand_refs(value, ref_resolver=ref_resolver)
if key != "definitions":
_expand_refs(value, ref_resolver=ref_resolver, visited_refs=visited_refs)
elif isinstance(schema, List):
for value in schema:
_expand_refs(value, ref_resolver=ref_resolver)
_expand_refs(value, ref_resolver=ref_resolver, visited_refs=visited_refs)


def expand_refs(schema: Any) -> None:
Expand Down
2 changes: 1 addition & 1 deletion cdk-migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Rationale: Our current interface for CustomIncrementalSync was assuming that the

Migration steps: Ensures that you don't implement `Retriever.state` or relying on the field `SimpleRetriever.cursor`. For more information, see the point above.

Rationale: As mentioned above, the state has been moved outside the realm of the stream responsibilities. Therefore, it does not make sense for the retriever (which is a stream specific concept) to hold state information. This way, a connector developer wanting to implement a CustomRetriever will not have to bother about state management anymore.
Rationale: As mentioned above, the state has been moved outside the realm of the stream responsibilities. Therefore, it does not make sense for the retriever (which is a stream specific concept) to hold state information. This way, a connector developer wanting to implement a CustomRetriever will not have to bother about state management anymore.

### Inheriting from Substream Partition Routing

Expand Down
32 changes: 32 additions & 0 deletions unit_tests/sources/utils/test_schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
InternalConfig,
ResourceSchemaLoader,
check_config_against_spec_or_exit,
expand_refs,
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

Expand Down Expand Up @@ -209,3 +210,34 @@ def test_shared_schemas_resolves_nested():
def test_internal_config(limit, record_count, expected):
config = InternalConfig(_limit=limit)
assert config.is_limit_reached(record_count) == expected


def test_expand_refs_handles_circular_references():
schema = {
"definitions": {
"A": {"type": "object", "properties": {"b": {"$ref": "#/definitions/B"}}},
"B": {"type": "object", "properties": {"a": {"$ref": "#/definitions/A"}}},
},
"type": "object",
"properties": {"root": {"$ref": "#/definitions/A"}},
}

expand_refs(schema)

assert "root" in schema["properties"]
assert schema["properties"]["root"]["type"] == "object"
assert "definitions" not in schema


def test_expand_refs_expands_simple_refs():
schema = {
"definitions": {"StringType": {"type": "string", "minLength": 1}},
"type": "object",
"properties": {"name": {"$ref": "#/definitions/StringType"}},
}

expand_refs(schema)

assert schema["properties"]["name"]["type"] == "string"
assert schema["properties"]["name"]["minLength"] == 1
assert "definitions" not in schema
Loading