Skip to content

zephyr: Use exponential backoffs in retry loops. #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions zulip/integrations/zephyr/zephyr_mirror_backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

from typing import Any, Dict, IO, List, Optional, Set, Text, Tuple, cast
from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Text, Tuple, cast
from types import FrameType

import sys
Expand All @@ -17,6 +17,8 @@
import tempfile
import select

from zulip import RandomExponentialBackoff

DEFAULT_SITE = "https://api.zulip.com"

class States:
Expand Down Expand Up @@ -218,32 +220,41 @@ def maybe_restart_mirroring_script() -> None:
except OSError:
# We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("")
while True:
backoff = RandomExponentialBackoff(
maximum_retries=3,
)
while backoff.keep_going():
try:
os.execvp(os.path.abspath(__file__), sys.argv)
# No need for backoff.succeed, since this can't be reached
except Exception:
logger.exception("Error restarting mirroring script; trying again... Traceback:")
time.sleep(1)
backoff.fail()
raise Exception("Failed to reload too many times, aborting!")

def process_loop(log: Optional[IO[Any]]) -> None:
def process_loop(log: Optional[IO[Any]]) -> NoReturn:
restart_check_count = 0
last_check_time = time.time()
recieve_backoff = RandomExponentialBackoff()
while True:
select.select([zephyr._z.getFD()], [], [], 15)
try:
process_backoff = RandomExponentialBackoff()
# Fetch notices from the queue until its empty
while True:
notice = zephyr.receive(block=False)
recieve_backoff.succeed()
if notice is None:
break
try:
process_notice(notice, log)
process_backoff.succeed()
except Exception:
logger.exception("Error relaying zephyr:")
time.sleep(2)
process_backoff.fail()
except Exception:
logger.exception("Error checking for new zephyrs:")
time.sleep(1)
recieve_backoff.fail()
continue

if time.time() - last_check_time > 15:
Expand Down Expand Up @@ -756,15 +767,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
# whole process
logger.exception("Error forwarding message:")

def zulip_to_zephyr(options: int) -> None:
def zulip_to_zephyr(options: int) -> NoReturn:
# Sync messages from zulip to zephyr
logger.info("Starting syncing messages.")
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
while True:
try:
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
except Exception:
logger.exception("Error syncing messages:")
time.sleep(1)
backoff.fail()

def subscribed_to_mail_messages() -> bool:
# In case we have lost our AFS tokens and those won't be able to
Expand Down Expand Up @@ -1134,7 +1146,6 @@ def die_gracefully(signal: int, frame: FrameType) -> None:
# Run the zulip => zephyr mirror in the child
configure_logger(logger, "zulip=>zephyr")
zulip_to_zephyr(options)
sys.exit(0)
else:
child_pid = None
CURRENT_STATE = States.ZephyrToZulip
Expand Down
22 changes: 21 additions & 1 deletion zulip/zulip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,27 @@
API_VERSTRING = "v1/"

class CountingBackoff:
def __init__(self, maximum_retries: int = 10, timeout_success_equivalent: Optional[float] = None, delay_cap: float = 90.0) -> None:
def __init__(
self,
maximum_retries: int = 10,
timeout_success_equivalent: Optional[float] = None,
delay_cap: float = 90.0,
) -> None:
"""Sets up a retry-backoff object. Example usage:
backoff = zulip.CountingBackoff()
while backoff.keep_going():
try:
something()
backoff.succeed()
except Exception:
backoff.fail()

timeout_success_equivalent is used in cases where 'success' is
never possible to determine automatically; it sets the
threshold in seconds before the next keep_going/fail, above
which the last run is treated like it was a success.

"""
self.number_of_retries = 0
self.maximum_retries = maximum_retries
self.timeout_success_equivalent = timeout_success_equivalent
Expand Down