1414# lower to leave space for extra data that will be added later, eg. "sentAt".
1515BATCH_SIZE_LIMIT = 475000
1616
17+ # Default duration limits (12 hours in seconds)
18+ DEFAULT_MAX_TOTAL_BACKOFF_DURATION = 43200
19+ DEFAULT_MAX_RATE_LIMIT_DURATION = 43200
20+
1721
1822class FatalError (Exception ):
1923 def __init__ (self , message ):
@@ -30,7 +34,9 @@ class Consumer(Thread):
3034
3135 def __init__ (self , queue , write_key , upload_size = 100 , host = None ,
3236 on_error = None , upload_interval = 0.5 , gzip = False , retries = 1000 ,
33- timeout = 15 , proxies = None , oauth_manager = None ):
37+ timeout = 15 , proxies = None , oauth_manager = None ,
38+ max_total_backoff_duration = DEFAULT_MAX_TOTAL_BACKOFF_DURATION ,
39+ max_rate_limit_duration = DEFAULT_MAX_RATE_LIMIT_DURATION ):
3440 """Create a consumer thread."""
3541 Thread .__init__ (self )
3642 # Make consumer a daemon thread so that it doesn't block program exit
@@ -51,6 +57,12 @@ def __init__(self, queue, write_key, upload_size=100, host=None,
5157 self .timeout = timeout
5258 self .proxies = proxies
5359 self .oauth_manager = oauth_manager
60+ self .max_total_backoff_duration = max_total_backoff_duration
61+ self .max_rate_limit_duration = max_rate_limit_duration
62+
63+ # Rate-limit state
64+ self .rate_limited_until = None
65+ self .rate_limit_start_time = None
5466
5567 def run (self ):
5668 """Runs the consumer."""
@@ -64,16 +76,77 @@ def pause(self):
6476 """Pause the consumer."""
6577 self .running = False
6678
79+ def set_rate_limit_state (self , response ):
80+ """Set rate-limit state from a 429 response with a valid Retry-After header."""
81+ retry_after = parse_retry_after (response ) if response else None
82+ if retry_after :
83+ self .rate_limited_until = time .time () + retry_after
84+ if self .rate_limit_start_time is None :
85+ self .rate_limit_start_time = time .time ()
86+
87+ def clear_rate_limit_state (self ):
88+ """Clear rate-limit state after successful request or duration exceeded."""
89+ self .rate_limited_until = None
90+ self .rate_limit_start_time = None
91+
6792 def upload (self ):
6893 """Upload the next batch of items, return whether successful."""
6994 success = False
7095 batch = self .next ()
7196 if len (batch ) == 0 :
7297 return False
7398
99+ # Check rate-limit state before attempting upload
100+ if self .rate_limited_until is not None :
101+ now = time .time ()
102+
103+ # Check if maxRateLimitDuration has been exceeded
104+ if (self .rate_limit_start_time is not None and
105+ now - self .rate_limit_start_time > self .max_rate_limit_duration ):
106+ self .log .error (
107+ 'Rate limit duration exceeded (%ds). Clearing rate-limit state and dropping batch.' ,
108+ self .max_rate_limit_duration
109+ )
110+ self .clear_rate_limit_state ()
111+ # Drop the batch by marking items as done
112+ if self .on_error :
113+ self .on_error (
114+ Exception ('Rate limit duration exceeded, batch dropped' ),
115+ batch
116+ )
117+ for _ in batch :
118+ self .queue .task_done ()
119+ return False
120+
121+ # Still rate-limited; wait until the rate limit expires
122+ wait_time = self .rate_limited_until - now
123+ if wait_time > 0 :
124+ self .log .debug (
125+ 'Rate-limited. Waiting %.2fs before next upload attempt.' ,
126+ wait_time
127+ )
128+ time .sleep (wait_time )
129+
74130 try :
75131 self .request (batch )
132+ # Success — clear rate-limit state
133+ self .clear_rate_limit_state ()
76134 success = True
135+ except APIError as e :
136+ if e .status == 429 :
137+ # 429: rate-limit state already set by request(). Re-queue batch.
138+ self .log .debug ('429 received. Re-queuing batch and halting upload iteration.' )
139+ for item in batch :
140+ try :
141+ self .queue .put (item , block = False )
142+ except Exception :
143+ pass # Queue full, item lost
144+ success = False
145+ else :
146+ self .log .error ('error uploading: %s' , e )
147+ success = False
148+ if self .on_error :
149+ self .on_error (e , batch )
77150 except Exception as e :
78151 self .log .error ('error uploading: %s' , e )
79152 success = False
@@ -128,18 +201,19 @@ def is_retryable_status(status):
128201 Retryable 4xx: 408, 410, 429, 460
129202 Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx
130203 Retryable 5xx: All except 501, 505
204+ - 511 is only retryable when OauthManager is configured
131205 Non-retryable 5xx: 501, 505
132206 """
133207 if 400 <= status < 500 :
134208 return status in (408 , 410 , 429 , 460 )
135209 elif 500 <= status < 600 :
136- return status not in (501 , 505 )
210+ if status in (501 , 505 ):
211+ return False
212+ if status == 511 :
213+ return self .oauth_manager is not None
214+ return True
137215 return False
138216
139- def should_use_retry_after (status ):
140- """Check if status code should respect Retry-After header"""
141- return status in (408 , 429 , 503 )
142-
143217 def calculate_backoff_delay (attempt ):
144218 """
145219 Calculate exponential backoff delay with jitter.
@@ -153,11 +227,11 @@ def calculate_backoff_delay(attempt):
153227
154228 total_attempts = 0
155229 backoff_attempts = 0
156- max_backoff_attempts = self .retries + 1
157- # Prevent infinite retry loops even with Retry-After
158- max_total_attempts = max_backoff_attempts * 10
230+ first_failure_time = None
159231
160232 while True :
233+ total_attempts += 1
234+
161235 try :
162236 # Make the request with current retry count
163237 response = post (
@@ -168,7 +242,7 @@ def calculate_backoff_delay(attempt):
168242 batch = batch ,
169243 proxies = self .proxies ,
170244 oauth_manager = self .oauth_manager ,
171- retry_count = total_attempts
245+ retry_count = total_attempts - 1
172246 )
173247 # Success
174248 return response
@@ -179,35 +253,35 @@ def calculate_backoff_delay(attempt):
179253 raise
180254
181255 except APIError as e :
182- total_attempts += 1
256+ # 429 with valid Retry-After: set rate-limit state and raise
257+ # to caller (pipeline blocking). Without Retry-After, fall
258+ # through to counted backoff like any other retryable error.
259+ if e .status == 429 :
260+ retry_after = parse_retry_after (e .response ) if e .response else None
261+ if retry_after is not None :
262+ self .set_rate_limit_state (e .response )
263+ raise
183264
184- # Prevent infinite retry loops
185- if total_attempts >= max_total_attempts :
265+ # Check if status is retryable
266+ if not is_retryable_status ( e . status ) :
186267 self .log .error (
187- f"Maximum total attempts ( { max_total_attempts } ) reached after { total_attempts } attempts. Final error : { e } "
268+ f"Non-retryable error { e . status } after { total_attempts } attempts: { e } "
188269 )
189270 raise
190271
191- # Check if we should use Retry-After header
192- if should_use_retry_after (e .status ) and e .response :
193- retry_after = parse_retry_after (e .response )
194- if retry_after :
195- self .log .debug (
196- f"Retry-After header present: waiting { retry_after } s (attempt { total_attempts } )"
197- )
198- time .sleep (retry_after )
199- continue # Does not count against backoff budget
200-
201- # Check if status is retryable
202- if not is_retryable_status (e .status ):
272+ # Transient error -- per-batch backoff
273+ if first_failure_time is None :
274+ first_failure_time = time .time ()
275+ if time .time () - first_failure_time > self .max_total_backoff_duration :
203276 self .log .error (
204- f"Non-retryable error { e .status } after { total_attempts } attempts: { e } "
277+ f"Max total backoff duration ({ self .max_total_backoff_duration } s) exceeded "
278+ f"after { total_attempts } attempts. Final error: { e } "
205279 )
206280 raise
207281
208282 # Count this against backoff attempts
209283 backoff_attempts += 1
210- if backoff_attempts >= max_backoff_attempts :
284+ if backoff_attempts >= self . retries + 1 :
211285 self .log .error (
212286 f"All { self .retries } retries exhausted after { total_attempts } total attempts. Final error: { e } "
213287 )
@@ -224,17 +298,18 @@ def calculate_backoff_delay(attempt):
224298
225299 except Exception as e :
226300 # Network errors or other exceptions - retry with backoff
227- total_attempts += 1
228- backoff_attempts += 1
229-
230- # Prevent infinite retry loops
231- if total_attempts >= max_total_attempts :
301+ if first_failure_time is None :
302+ first_failure_time = time .time ()
303+ if time .time () - first_failure_time > self .max_total_backoff_duration :
232304 self .log .error (
233- f"Maximum total attempts ({ max_total_attempts } ) reached after { total_attempts } attempts. Final error: { e } "
305+ f"Max total backoff duration ({ self .max_total_backoff_duration } s) exceeded "
306+ f"after { total_attempts } attempts. Final error: { e } "
234307 )
235308 raise
236309
237- if backoff_attempts >= max_backoff_attempts :
310+ backoff_attempts += 1
311+
312+ if backoff_attempts >= self .retries + 1 :
238313 self .log .error (
239314 f"All { self .retries } retries exhausted after { total_attempts } total attempts. Final error: { e } "
240315 )
0 commit comments