Skip to content

Commit c353761

Browse files
authored
fix(airbyte-cdk): Fix Record Filter Validation in ConcurrentDeclarativeSource (#45)
1 parent 4d16e1c commit c353761

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
1717
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
1818
from airbyte_cdk.sources.declarative.extractors import RecordSelector
19+
from airbyte_cdk.sources.declarative.extractors.record_filter import (
20+
ClientSideIncrementalRecordFilterDecorator,
21+
)
1922
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
2023
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
2124
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
@@ -291,6 +294,9 @@ def _stream_supports_concurrent_partition_processing(
291294
if isinstance(record_selector, RecordSelector):
292295
if (
293296
record_selector.record_filter
297+
and not isinstance(
298+
record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
299+
)
294300
and "stream_state" in record_selector.record_filter.condition
295301
):
296302
self.logger.warning(

0 commit comments

Comments
 (0)