Skip to content

Commit a28102f

Browse files
MichaelGHSegclaude
andcommitted
Fix Retry-After: 0 handling and 429 re-queue guard
Handle Retry-After: 0 correctly by checking 'is not None' instead of truthiness. Prevent silent batch re-queue on 429 without Retry-After by gating the upload() re-queue path on rate_limited_until being set. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6ed8ef6 commit a28102f

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

segment/analytics/consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def pause(self):
7979
def set_rate_limit_state(self, response):
8080
"""Set rate-limit state from a 429 response with a valid Retry-After header."""
8181
retry_after = parse_retry_after(response) if response else None
82-
if retry_after:
82+
if retry_after is not None:
8383
self.rate_limited_until = time.time() + retry_after
8484
if self.rate_limit_start_time is None:
8585
self.rate_limit_start_time = time.time()
@@ -133,7 +133,7 @@ def upload(self):
133133
self.clear_rate_limit_state()
134134
success = True
135135
except APIError as e:
136-
if e.status == 429:
136+
if e.status == 429 and self.rate_limited_until is not None:
137137
# 429: rate-limit state already set by request(). Re-queue batch.
138138
self.log.debug('429 received. Re-queuing batch and halting upload iteration.')
139139
for item in batch:

segment/analytics/test/test_consumer.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,59 @@ def on_error(e, batch):
741741
self.assertIsNotNone(consumer.rate_limited_until)
742742
self.assertIsNotNone(consumer.rate_limit_start_time)
743743

744+
def test_429_without_retry_after_does_not_requeue_batch(self):
745+
"""429 without Retry-After is treated as normal failure in upload() and is not re-queued"""
746+
q = Queue()
747+
consumer = Consumer(q, 'testsecret', retries=0)
748+
track = {'type': 'track', 'event': 'python event', 'userId': 'userId'}
749+
q.put(track)
750+
751+
def mock_post_fn(*args, **kwargs):
752+
error = APIError(429, 'rate_limit', 'Too Many Requests')
753+
error.response = mock.Mock()
754+
error.response.headers = {}
755+
raise error
756+
757+
on_error_called = []
758+
759+
def on_error(e, batch):
760+
on_error_called.append((e, batch))
761+
762+
consumer.on_error = on_error
763+
764+
with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn):
765+
with mock.patch('time.sleep'):
766+
result = consumer.upload()
767+
768+
self.assertFalse(result)
769+
self.assertEqual(len(on_error_called), 1)
770+
self.assertIsNone(consumer.rate_limited_until)
771+
self.assertEqual(q.qsize(), 0)
772+
773+
def test_retry_after_zero_sets_rate_limit_state(self):
774+
"""429 with Retry-After: 0 still sets rate-limit state for consistent pipeline handling"""
775+
consumer = Consumer(None, 'testsecret', retries=1)
776+
track = {'type': 'track', 'event': 'python event', 'userId': 'userId'}
777+
778+
def mock_post_fn(*args, **kwargs):
779+
response = mock.Mock()
780+
response.headers = {'Retry-After': '0'}
781+
error = APIError(429, 'rate_limit', 'Too Many Requests')
782+
error.response = response
783+
raise error
784+
785+
before = time.time()
786+
with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn):
787+
with self.assertRaises(APIError) as ctx:
788+
consumer.request([track])
789+
self.assertEqual(ctx.exception.status, 429)
790+
after = time.time()
791+
792+
self.assertIsNotNone(consumer.rate_limited_until)
793+
self.assertIsNotNone(consumer.rate_limit_start_time)
794+
self.assertGreaterEqual(consumer.rate_limited_until, before)
795+
self.assertLessEqual(consumer.rate_limited_until, after + 0.1)
796+
744797
def test_t19_max_total_backoff_duration(self):
745798
"""T19: Gives up after maxTotalBackoffDuration elapsed"""
746799
consumer = Consumer(None, 'testsecret', retries=1000,

0 commit comments

Comments
 (0)