Skip to content

Commit 4a18954

Browse files
committed
Add deleting finished semaphores
1 parent c51f840 commit 4a18954

File tree

2 files changed

+96
-7
lines changed

2 files changed

+96
-7
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ def _check_and_update_parent_state(self) -> None:
169169
Pop the leftmost partition state from _partition_parent_state_map only if
170170
*all partitions* up to (and including) that partition key in _semaphore_per_partition
171171
are fully finished (i.e. in _finished_partitions and semaphore._value == 0).
172+
Additionally, delete finished semaphores with a value of 0 to free up memory,
173+
as they are only needed to track errors and completion status.
172174
"""
173175
last_closed_state = None
174176

@@ -178,7 +180,9 @@ def _check_and_update_parent_state(self) -> None:
178180

179181
# Verify ALL partitions from the left up to earliest_key are finished
180182
all_left_finished = True
181-
for p_key, sem in self._semaphore_per_partition.items():
183+
for p_key, sem in list(
184+
self._semaphore_per_partition.items()
185+
): # Use list to allow modification during iteration
182186
# If any earlier partition is still not finished, we must stop
183187
if p_key not in self._finished_partitions or sem._value != 0:
184188
all_left_finished = False
@@ -191,17 +195,26 @@ def _check_and_update_parent_state(self) -> None:
191195
if not all_left_finished:
192196
break
193197

194-
# Otherwise, pop the leftmost entry from parent-state map
198+
# Pop the leftmost entry from parent-state map
195199
_, closed_parent_state = self._partition_parent_state_map.popitem(last=False)
196200
last_closed_state = closed_parent_state
197201

198-
# Update _parent_state if we actually popped at least one partition
202+
# Clean up finished semaphores with value 0 up to and including earliest_key
203+
for p_key in list(self._semaphore_per_partition.keys()):
204+
sem = self._semaphore_per_partition[p_key]
205+
if p_key in self._finished_partitions and sem._value == 0:
206+
del self._semaphore_per_partition[p_key]
207+
logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0")
208+
if p_key == earliest_key:
209+
break
210+
211+
# Update _parent_state if we popped at least one partition
199212
if last_closed_state is not None:
200213
self._parent_state = last_closed_state
201214

202215
def ensure_at_least_one_state_emitted(self) -> None:
203216
"""
204-
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
217+
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
205218
called.
206219
"""
207220
if not any(
@@ -238,6 +251,7 @@ def _emit_state_message(self, throttle: bool = True) -> None:
238251
self._message_repository.emit_message(state_message)
239252

240253
def stream_slices(self) -> Iterable[StreamSlice]:
254+
print("stream_slices")
241255
if self._timer.is_running():
242256
raise RuntimeError("stream_slices has been executed more than once.")
243257

@@ -313,9 +327,9 @@ def _ensure_partition_limit(self) -> None:
313327
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
314328
# Try removing finished partitions first
315329
for partition_key in list(self._cursor_per_partition.keys()):
316-
if (
317-
partition_key in self._finished_partitions
318-
and self._semaphore_per_partition[partition_key]._value == 0
330+
if partition_key in self._finished_partitions and (
331+
partition_key not in self._semaphore_per_partition
332+
or self._semaphore_per_partition[partition_key]._value == 0
319333
):
320334
oldest_partition = self._cursor_per_partition.pop(
321335
partition_key

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3159,6 +3159,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
31593159
}
31603160
assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition
31613161
assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition
3162+
assert len(cursor._semaphore_per_partition) == 2
31623163

31633164

31643165
def test_given_unfinished_last_parent_partition_with_partial_parent_state_update():
@@ -3243,6 +3244,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
32433244
}
32443245
assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition
32453246
assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition
3247+
assert len(cursor._semaphore_per_partition) == 1
32463248

32473249

32483250
def test_given_all_partitions_finished_when_close_partition_then_final_state_emitted():
@@ -3317,6 +3319,7 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi
33173319
assert final_state["lookback_window"] == 1
33183320
assert cursor._message_repository.emit_message.call_count == 2
33193321
assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition
3322+
assert len(cursor._semaphore_per_partition) == 1
33203323

33213324

33223325
def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_global_cursor():
@@ -3377,3 +3380,75 @@ def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_glob
33773380
assert "lookback_window" in final_state
33783381
assert len(cursor._cursor_per_partition) <= cursor.DEFAULT_MAX_PARTITIONS_NUMBER
33793382
assert mock_cursor.stream_slices.call_count == 3 # Called once for each partition
3383+
3384+
3385+
def test_semaphore_cleanup():
3386+
# Create two mock cursors with different states for each partition
3387+
mock_cursor_1 = MagicMock()
3388+
mock_cursor_1.stream_slices.return_value = iter(
3389+
[
3390+
{"slice1": "data1"},
3391+
{"slice2": "data1"}, # First partition slices
3392+
]
3393+
)
3394+
mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1"
3395+
3396+
mock_cursor_2 = MagicMock()
3397+
mock_cursor_2.stream_slices.return_value = iter(
3398+
[
3399+
{"slice2": "data2"},
3400+
{"slice2": "data2"}, # Second partition slices
3401+
]
3402+
)
3403+
mock_cursor_2.state = {"updated_at": "2024-01-03T00:00:00Z"} # State for partition "2"
3404+
3405+
# Configure cursor factory to return different mock cursors based on partition
3406+
cursor_factory_mock = MagicMock()
3407+
cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2]
3408+
3409+
cursor = ConcurrentPerPartitionCursor(
3410+
cursor_factory=cursor_factory_mock,
3411+
partition_router=MagicMock(),
3412+
stream_name="test_stream",
3413+
stream_namespace=None,
3414+
stream_state={},
3415+
message_repository=MagicMock(),
3416+
connector_state_manager=MagicMock(),
3417+
connector_state_converter=MagicMock(),
3418+
cursor_field=CursorField(cursor_field_key="updated_at"),
3419+
)
3420+
3421+
# Simulate partitions with unique parent states
3422+
slices = [
3423+
StreamSlice(partition={"id": "1"}, cursor_slice={}),
3424+
StreamSlice(partition={"id": "2"}, cursor_slice={}),
3425+
]
3426+
cursor._partition_router.stream_slices.return_value = iter(slices)
3427+
# Simulate unique parent states for each partition
3428+
cursor._partition_router.get_stream_state.side_effect = [
3429+
{"parent": {"state": "state1"}}, # Parent state for partition "1"
3430+
{"parent": {"state": "state2"}}, # Parent state for partition "2"
3431+
]
3432+
3433+
# Generate slices to populate semaphores and parent states
3434+
generated_slices = list(
3435+
cursor.stream_slices()
3436+
) # Populate _semaphore_per_partition and _partition_parent_state_map
3437+
3438+
# Verify initial state
3439+
assert len(cursor._semaphore_per_partition) == 2
3440+
assert len(cursor._partition_parent_state_map) == 2
3441+
assert cursor._partition_parent_state_map['{"id":"1"}'] == {"parent": {"state": "state1"}}
3442+
assert cursor._partition_parent_state_map['{"id":"2"}'] == {"parent": {"state": "state2"}}
3443+
3444+
# Close partitions to acquire semaphores (value back to 0)
3445+
for s in generated_slices:
3446+
cursor.close_partition(DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), s))
3447+
3448+
# Check state after closing partitions
3449+
assert len(cursor._finished_partitions) == 2
3450+
assert len(cursor._semaphore_per_partition) == 0
3451+
assert '{"id":"1"}' not in cursor._semaphore_per_partition
3452+
assert '{"id":"2"}' not in cursor._semaphore_per_partition
3453+
assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped
3454+
assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state

0 commit comments

Comments
 (0)