Skip to content

feat: enable custom validation strategies #610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4204,7 +4204,13 @@ definitions:
items:
type: object
examples:
- [{"name": "test stream", "$parameters": {"entity": "test entity"}, "primary_key": "test key"}]
- [
{
"name": "test stream",
"$parameters": { "entity": "test entity" },
"primary_key": "test key",
},
]
ParametrizedComponentsResolver:
type: object
title: Parametrized Components Resolver
Expand Down Expand Up @@ -4355,6 +4361,7 @@ definitions:
description: The condition that the specified config value will be evaluated against
anyOf:
- "$ref": "#/definitions/ValidateAdheresToSchema"
- "$ref": "#/definitions/CustomValidationStrategy"
PredicateValidator:
title: Predicate Validator
description: Validator that applies a validation strategy to a specified value.
Expand Down Expand Up @@ -4389,6 +4396,7 @@ definitions:
description: The validation strategy to apply to the value.
anyOf:
- "$ref": "#/definitions/ValidateAdheresToSchema"
- "$ref": "#/definitions/CustomValidationStrategy"
ValidateAdheresToSchema:
title: Validate Adheres To Schema
description: Validates that a user-provided schema adheres to a specified JSON schema.
Expand Down Expand Up @@ -4442,6 +4450,25 @@ definitions:
required:
- name
- age
CustomValidationStrategy:
title: Custom Validation Strategy
description: Custom validation strategy that allows for custom validation logic.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomValidationStrategy]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom validation strategy. Has to be a sub class of ValidationStrategy. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_declarative_manifest.components.MyCustomValidationStrategy"
ConfigRemapField:
title: Remap Field
description: Transformation that remaps a field's value to another value based on a static map.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1586,6 +1584,19 @@ class ValidateAdheresToSchema(BaseModel):
)


class CustomValidationStrategy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomValidationStrategy"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom validation strategy. Has to be a sub class of ValidationStrategy. The format is `source_<name>.<package>.<class_name>`.",
examples=["source_declarative_manifest.components.MyCustomValidationStrategy"],
title="Class Name",
)


class ConfigRemapField(BaseModel):
type: Literal["ConfigRemapField"]
map: Union[Dict[str, Any], str] = Field(
Expand Down Expand Up @@ -1767,30 +1778,23 @@ class DatetimeBasedCursor(BaseModel):
examples=["created_at", "{{ config['record_cursor'] }}"],
title="Cursor Field",
)
datetime_format: str = Field(
...,
description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms", "%s_as_float"],
title="Outgoing Datetime Format",
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.",
title="Cursor Datetime Formats",
)
start_datetime: Union[str, MinMaxDatetime] = Field(
start_datetime: Union[MinMaxDatetime, str] = Field(
...,
description="The datetime that determines the earliest record that should be synced.",
examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"],
title="Start Datetime",
)
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.",
title="Cursor Datetime Formats",
)
cursor_granularity: Optional[str] = Field(
start_time_option: Optional[RequestOption] = Field(
None,
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
examples=["PT1S"],
title="Cursor Granularity",
description="Optionally configures how the start datetime will be sent in requests to the source API.",
title="Inject Start Time Into Outgoing HTTP Request",
)
end_datetime: Optional[Union[str, MinMaxDatetime]] = Field(
end_datetime: Optional[Union[MinMaxDatetime, str]] = Field(
None,
description="The datetime that determines the last record that should be synced. If not provided, `{{ now_utc() }}` will be used.",
examples=["2021-01-1T00:00:00Z", "{{ now_utc() }}", "{{ day_delta(-1) }}"],
Expand All @@ -1801,6 +1805,18 @@ class DatetimeBasedCursor(BaseModel):
description="Optionally configures how the end datetime will be sent in requests to the source API.",
title="Inject End Time Into Outgoing HTTP Request",
)
datetime_format: str = Field(
...,
description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms", "%s_as_float"],
title="Outgoing Datetime Format",
)
cursor_granularity: Optional[str] = Field(
None,
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
examples=["PT1S"],
title="Cursor Granularity",
)
is_data_feed: Optional[bool] = Field(
None,
description="A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.",
Expand Down Expand Up @@ -1839,11 +1855,6 @@ class DatetimeBasedCursor(BaseModel):
examples=["starting_time"],
title="Partition Field Start",
)
start_time_option: Optional[RequestOption] = Field(
None,
description="Optionally configures how the start datetime will be sent in requests to the source API.",
title="Inject Start Time Into Outgoing HTTP Request",
)
step: Optional[str] = Field(
None,
description="The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.",
Expand Down Expand Up @@ -1908,10 +1919,10 @@ class DefaultErrorHandler(BaseModel):
List[
Union[
ConstantBackoffStrategy,
CustomBackoffStrategy,
ExponentialBackoffStrategy,
WaitTimeFromHeader,
WaitUntilTimeFromHeader,
CustomBackoffStrategy,
]
]
] = Field(
Expand Down Expand Up @@ -2030,7 +2041,7 @@ class DpathValidator(BaseModel):
],
title="Field Path",
)
validation_strategy: ValidateAdheresToSchema = Field(
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
...,
description="The condition that the specified config value will be evaluated against",
title="Validation Strategy",
Expand All @@ -2050,7 +2061,7 @@ class PredicateValidator(BaseModel):
],
title="Value",
)
validation_strategy: ValidateAdheresToSchema = Field(
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
...,
description="The validation strategy to apply to the value.",
title="Validation Strategy",
Expand Down Expand Up @@ -2285,12 +2296,12 @@ class Config:
ApiKeyAuthenticator,
BasicHttpAuthenticator,
BearerAuthenticator,
CustomAuthenticator,
OAuthAuthenticator,
JwtAuthenticator,
SessionTokenAuthenticator,
NoAuth,
LegacySessionTokenAuthenticator,
CustomAuthenticator,
NoAuth,
],
] = Field(
...,
Expand Down Expand Up @@ -2374,7 +2385,6 @@ class Config:
InlineSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
List[
Union[
InlineSchemaLoader,
Expand All @@ -2383,6 +2393,7 @@ class Config:
CustomSchemaLoader,
]
],
CustomSchemaLoader,
]
] = Field(
None,
Expand All @@ -2393,13 +2404,13 @@ class Config:
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
CustomTransformation,
]
]
] = Field(
Expand Down Expand Up @@ -2631,7 +2642,7 @@ class HttpRequester(BaseModelWithDeprecations):

class DynamicSchemaLoader(BaseModel):
type: Literal["DynamicSchemaLoader"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
Expand All @@ -2645,13 +2656,13 @@ class DynamicSchemaLoader(BaseModel):
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
CustomTransformation,
]
]
] = Field(
Expand Down Expand Up @@ -2895,7 +2906,7 @@ class AsyncRetriever(BaseModel):
] = Field(
None,
description="Component decoding the response so records can be extracted.",
title="Decoder",
title="HTTP Response Format",
)
download_decoder: Optional[
Union[
Expand All @@ -2911,7 +2922,7 @@ class AsyncRetriever(BaseModel):
] = Field(
None,
description="Component decoding the download response so records can be extracted.",
title="Download Decoder",
title="Download HTTP Response Format",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand All @@ -2935,7 +2946,7 @@ class GroupingPartitionRouter(BaseModel):
title="Group Size",
)
underlying_partition_router: Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
ListPartitionRouter, SubstreamPartitionRouter, CustomPartitionRouter
] = Field(
...,
description="The partition router whose output will be grouped. This can be any valid partition router component.",
Expand All @@ -2951,7 +2962,7 @@ class GroupingPartitionRouter(BaseModel):

class HttpComponentsResolver(BaseModel):
type: Literal["HttpComponentsResolver"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomTransformation as CustomTransformationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomValidationStrategy as CustomValidationStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DatetimeBasedCursor as DatetimeBasedCursorModel,
)
Expand Down Expand Up @@ -683,6 +686,7 @@ def _init_mappings(self) -> None:
CustomPaginationStrategyModel: self.create_custom_component,
CustomPartitionRouterModel: self.create_custom_component,
CustomTransformationModel: self.create_custom_component,
CustomValidationStrategyModel: self.create_custom_component,
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
DeclarativeStreamModel: self.create_declarative_stream,
DefaultErrorHandlerModel: self.create_default_error_handler,
Expand Down
Loading