|
17 | 17 | import tempfile
|
18 | 18 | import select
|
19 | 19 |
|
| 20 | +from zulip import RandomExponentialBackoff |
| 21 | + |
20 | 22 | DEFAULT_SITE = "https://api.zulip.com"
|
21 | 23 |
|
22 | 24 | class States:
|
@@ -218,32 +220,41 @@ def maybe_restart_mirroring_script() -> None:
|
218 | 220 | except OSError:
|
219 | 221 | # We don't care whether we failed to cancel subs properly, but we should log it
|
220 | 222 | logger.exception("")
|
221 |
| - while True: |
| 223 | + backoff = RandomExponentialBackoff( |
| 224 | + maximum_retries=3, |
| 225 | + ) |
| 226 | + while backoff.keep_going(): |
222 | 227 | try:
|
223 | 228 | os.execvp(os.path.abspath(__file__), sys.argv)
|
| 229 | + # No need for backoff.succeed, since this can't be reached |
224 | 230 | except Exception:
|
225 | 231 | logger.exception("Error restarting mirroring script; trying again... Traceback:")
|
226 |
| - time.sleep(1) |
| 232 | + backoff.fail() |
| 233 | + raise Exception("Failed to reload too many times, aborting!") |
227 | 234 |
|
228 | 235 | def process_loop(log: Optional[IO[Any]]) -> None:
|
229 | 236 | restart_check_count = 0
|
230 | 237 | last_check_time = time.time()
|
| 238 | + recieve_backoff = RandomExponentialBackoff() |
231 | 239 | while True:
|
232 | 240 | select.select([zephyr._z.getFD()], [], [], 15)
|
233 | 241 | try:
|
| 242 | + process_backoff = RandomExponentialBackoff() |
234 | 243 | # Fetch notices from the queue until its empty
|
235 | 244 | while True:
|
236 | 245 | notice = zephyr.receive(block=False)
|
| 246 | + recieve_backoff.succeed() |
237 | 247 | if notice is None:
|
238 | 248 | break
|
239 | 249 | try:
|
240 | 250 | process_notice(notice, log)
|
| 251 | + process_backoff.succeed() |
241 | 252 | except Exception:
|
242 | 253 | logger.exception("Error relaying zephyr:")
|
243 |
| - time.sleep(2) |
| 254 | + process_backoff.fail() |
244 | 255 | except Exception:
|
245 | 256 | logger.exception("Error checking for new zephyrs:")
|
246 |
| - time.sleep(1) |
| 257 | + recieve_backoff.fail() |
247 | 258 | continue
|
248 | 259 |
|
249 | 260 | if time.time() - last_check_time > 15:
|
@@ -759,12 +770,13 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
759 | 770 | def zulip_to_zephyr(options: int) -> None:
|
760 | 771 | # Sync messages from zulip to zephyr
|
761 | 772 | logger.info("Starting syncing messages.")
|
| 773 | + backoff = RandomExponentialBackoff(timeout_success_equivalent=120) |
762 | 774 | while True:
|
763 | 775 | try:
|
764 | 776 | zulip_client.call_on_each_message(maybe_forward_to_zephyr)
|
765 | 777 | except Exception:
|
766 | 778 | logger.exception("Error syncing messages:")
|
767 |
| - time.sleep(1) |
| 779 | + backoff.fail() |
768 | 780 |
|
769 | 781 | def subscribed_to_mail_messages() -> bool:
|
770 | 782 | # In case we have lost our AFS tokens and those won't be able to
|
|
0 commit comments