Skip to content

fix(concurrent perpartition cursor): Increase throttle time to 10 minutes #542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ def ensure_at_least_one_state_emitted(self) -> None:

def _throttle_state_message(self) -> Optional[float]:
"""
Throttles the state message emission to once every 60 seconds.
Throttles the state message emission to once every 600 seconds.
"""
current_time = time.time()
if current_time - self._last_emission_time <= 60:
if current_time - self._last_emission_time <= 600:
return None
return current_time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3265,8 +3265,8 @@ def test_incremental_substream_request_options_provider(

def test_state_throttling(mocker):
"""
Verifies that _emit_state_message does not emit a new state if less than 60s
have passed since last emission, and does emit once 60s or more have passed.
Verifies that _emit_state_message does not emit a new state if less than 600s
have passed since last emission, and does emit once 600s or more have passed.
"""
cursor = ConcurrentPerPartitionCursor(
cursor_factory=MagicMock(),
Expand All @@ -3288,20 +3288,20 @@ def test_state_throttling(mocker):

mock_time = mocker.patch("time.time")

# First attempt: only 10 seconds passed => NO emission
mock_time.return_value = 10
# First attempt: only 100 seconds passed => NO emission
mock_time.return_value = 100
cursor._emit_state_message()
mock_connector_manager.update_state_for_stream.assert_not_called()
mock_repo.emit_message.assert_not_called()

# Second attempt: 30 seconds passed => still NO emission
mock_time.return_value = 30
# Second attempt: 300 seconds passed => still NO emission
mock_time.return_value = 300
cursor._emit_state_message()
mock_connector_manager.update_state_for_stream.assert_not_called()
mock_repo.emit_message.assert_not_called()

# Advance time: 70 seconds => exceed 60s => MUST emit
mock_time.return_value = 70
# Advance time: 700 seconds => exceed 600s => MUST emit
mock_time.return_value = 700
cursor._emit_state_message()
mock_connector_manager.update_state_for_stream.assert_called_once()
mock_repo.emit_message.assert_called_once()
Expand Down
Loading