|
14 | 14 | # lower to leave space for extra data that will be added later, eg. "sentAt". |
15 | 15 | BATCH_SIZE_LIMIT = 475000 |
16 | 16 |
|
| 17 | + |
17 | 18 | class FatalError(Exception): |
18 | 19 | def __init__(self, message): |
19 | 20 | self.message = message |
| 21 | + |
20 | 22 | def __str__(self): |
21 | 23 | msg = "[Segment] {0})" |
22 | 24 | return msg.format(self.message) |
@@ -81,7 +83,7 @@ def upload(self): |
81 | 83 | # mark items as acknowledged from queue |
82 | 84 | for _ in batch: |
83 | 85 | self.queue.task_done() |
84 | | - return success |
| 86 | + return success |
85 | 87 |
|
86 | 88 | def next(self): |
87 | 89 | """Return the next batch of items to upload.""" |
@@ -132,14 +134,26 @@ def fatal_exception(exc): |
132 | 134 | # retry on all other errors (eg. network) |
133 | 135 | return False |
134 | 136 |
|
| 137 | + attempt_count = 0 |
| 138 | + |
135 | 139 | @backoff.on_exception( |
136 | 140 | backoff.expo, |
137 | 141 | Exception, |
138 | 142 | max_tries=self.retries + 1, |
139 | | - giveup=fatal_exception) |
| 143 | + giveup=fatal_exception, |
| 144 | + on_backoff=lambda details: self.log.debug( |
| 145 | + f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s" |
| 146 | + )) |
140 | 147 | def send_request(): |
141 | | - post(self.write_key, self.host, gzip=self.gzip, |
142 | | - timeout=self.timeout, batch=batch, proxies=self.proxies, |
143 | | - oauth_manager=self.oauth_manager) |
| 148 | + nonlocal attempt_count |
| 149 | + attempt_count += 1 |
| 150 | + try: |
| 151 | + return post(self.write_key, self.host, gzip=self.gzip, |
| 152 | + timeout=self.timeout, batch=batch, proxies=self.proxies, |
| 153 | + oauth_manager=self.oauth_manager) |
| 154 | + except Exception as e: |
| 155 | + if attempt_count > self.retries: |
| 156 | + self.log.error(f"All {self.retries} retries exhausted. Final error: {e}") |
| 157 | + raise |
144 | 158 |
|
145 | 159 | send_request() |
0 commit comments