33
33
from airbyte_cdk .sources .declarative .partition_routers .single_partition_router import (
34
34
SinglePartitionRouter ,
35
35
)
36
- from airbyte_cdk .sources .declarative .stream_slicers import StreamSlicerTestReadDecorator
36
+ from airbyte_cdk .sources .declarative .stream_slicers import (
37
+ StreamSlicer ,
38
+ StreamSlicerTestReadDecorator ,
39
+ )
37
40
from airbyte_cdk .sources .message import NoopMessageRepository
38
41
from unit_tests .sources .declarative .async_job .test_integration import MockAsyncJobRepository
39
42
@@ -83,6 +86,24 @@ def date_time_based_cursor_factory() -> DatetimeBasedCursor:
83
86
)
84
87
85
88
89
+ def create_substream_partition_router ():
90
+ return SubstreamPartitionRouter (
91
+ config = {},
92
+ parameters = {},
93
+ parent_stream_configs = [
94
+ ParentStreamConfig (
95
+ type = "ParentStreamConfig" ,
96
+ parent_key = "id" ,
97
+ partition_field = "id" ,
98
+ stream = DeclarativeStream (
99
+ type = "DeclarativeStream" ,
100
+ retriever = CustomRetriever (type = "CustomRetriever" , class_name = "a_class_name" ),
101
+ ),
102
+ )
103
+ ],
104
+ )
105
+
106
+
86
107
def test_isinstance_global_cursor ():
87
108
first_partition = {"first_partition_key" : "first_partition_value" }
88
109
partition_router = mocked_partition_router ()
@@ -142,21 +163,7 @@ def test_isinstance_global_cursor_aysnc_job_partition_router():
142
163
143
164
144
165
def test_isinstance_substrea_partition_router ():
145
- partition_router = SubstreamPartitionRouter (
146
- config = {},
147
- parameters = {},
148
- parent_stream_configs = [
149
- ParentStreamConfig (
150
- type = "ParentStreamConfig" ,
151
- parent_key = "id" ,
152
- partition_field = "id" ,
153
- stream = DeclarativeStream (
154
- type = "DeclarativeStream" ,
155
- retriever = CustomRetriever (type = "CustomRetriever" , class_name = "a_class_name" ),
156
- ),
157
- )
158
- ],
159
- )
166
+ partition_router = create_substream_partition_router ()
160
167
161
168
wrapped_slicer = StreamSlicerTestReadDecorator (
162
169
wrapped_slicer = partition_router ,
@@ -175,21 +182,7 @@ def test_isinstance_substrea_partition_router():
175
182
176
183
177
184
def test_isinstance_perpartition_with_global_cursor ():
178
- partition_router = SubstreamPartitionRouter (
179
- config = {},
180
- parameters = {},
181
- parent_stream_configs = [
182
- ParentStreamConfig (
183
- type = "ParentStreamConfig" ,
184
- parent_key = "id" ,
185
- partition_field = "id" ,
186
- stream = DeclarativeStream (
187
- type = "DeclarativeStream" ,
188
- retriever = CustomRetriever (type = "CustomRetriever" , class_name = "a_class_name" ),
189
- ),
190
- )
191
- ],
192
- )
185
+ partition_router = create_substream_partition_router ()
193
186
date_time_based_cursor = date_time_based_cursor_factory ()
194
187
195
188
cursor_factory = CursorFactory (date_time_based_cursor_factory )
@@ -221,3 +214,21 @@ def test_isinstance_perpartition_with_global_cursor():
221
214
assert substream_cursor ._global_cursor ._stream_cursor == date_time_based_cursor
222
215
223
216
assert substream_cursor ._get_active_cursor () == wrapped_slicer ._get_active_cursor ()
217
+
218
+
219
+ def test_slice_limiting_functionality ():
220
+ # Create a slicer that returns many slices
221
+ mock_slicer = Mock (spec = StreamSlicer )
222
+ mock_slicer .stream_slices .return_value = [
223
+ StreamSlice (partition = {f"key_{ i } " : f"value_{ i } " }, cursor_slice = {}) for i in range (10 )
224
+ ]
225
+
226
+ # Wrap with decorator limiting to 3 slices
227
+ wrapped_slicer = StreamSlicerTestReadDecorator (
228
+ wrapped_slicer = mock_slicer ,
229
+ maximum_number_of_slices = 3 ,
230
+ )
231
+
232
+ # Verify only 3 slices are returned
233
+ slices = list (wrapped_slicer .stream_slices ())
234
+ assert len (slices ) == 3
0 commit comments