Skip to content

Commit

Permalink
adding new feature to retry on Aerospike runtime exceptions Issue#202 (
Browse files Browse the repository at this point in the history
…faust-streaming#203)

* adding new feature to retry on Aerospike runtime exceptions Issue#202

* fix linting

* fix exception handling

* fix exception handling

* fix unit tests
  • Loading branch information
patkivikram authored Oct 21, 2021
1 parent 808312b commit 868d7a4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 48 deletions.
113 changes: 65 additions & 48 deletions faust/stores/aerospike.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import typing
from typing import Any, Dict, Iterator, Optional, Tuple, Union

Expand Down Expand Up @@ -26,8 +27,8 @@ class Client: # noqa
import aerospike.exception.RecordNotFound
else:

class RecordNotFound: # noqa
"""Dummy Client."""
class RecordNotFound(Exception): # noqa
"""Dummy Exception."""


aerospike_client: Client = None
Expand Down Expand Up @@ -88,8 +89,9 @@ def get_aerospike_client(aerospike_config: Dict[Any, Any]) -> Client:

def _get(self, key: bytes) -> Optional[bytes]:
key = (self.namespace, self.table_name, key)
fun = self.client.get
try:
(key, meta, bins) = self.client.get(key=key)
(key, meta, bins) = self.aerospike_fun_call_with_retry(fun=fun, key=key)
if bins:
return bins[self.BIN_KEY]
return None
Expand All @@ -100,17 +102,15 @@ def _get(self, key: bytes) -> Optional[bytes]:
self.log.error(
f"Error in set for table {self.table_name} exception {ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _set(self, key: bytes, value: Optional[bytes]) -> None:
try:
fun = self.client.put
key = (self.namespace, self.table_name, key)
vt = {self.BIN_KEY: value}
self.client.put(
self.aerospike_fun_call_with_retry(
fun=fun,
key=key,
bins=vt,
meta={"ttl": self.ttl},
Expand All @@ -119,55 +119,51 @@ def _set(self, key: bytes, value: Optional[bytes]) -> None:
"key": aerospike.POLICY_KEY_SEND,
},
)

except Exception as ex:
self.log.error(
f"Error in set for table {self.table_name} exception {ex} key {key}"
f"FaustAerospikeException Error in set for "
f"table {self.table_name} exception {ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _del(self, key: bytes) -> None:
try:
key = (self.namespace, self.table_name, key)
self.client.remove(key=key)
self.aerospike_fun_call_with_retry(fun=self.client.remove, key=key)
except aerospike.exception.RecordNotFound as ex:
self.log.warning(
self.log.debug(
f"Error in delete for table {self.table_name} exception {ex} key {key}"
)
except Exception as ex:
self.log.error(
f"Error in delete for table {self.table_name} exception {ex} key {key}"
f"FaustAerospikeException Error in delete for "
f"table {self.table_name} exception {ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _iterkeys(self) -> Iterator[bytes]:
try:
scan: aerospike.Scan = self.client.scan(
namespace=self.namespace, set=self.table_name
fun = self.client.scan

scan: aerospike.Scan = self.aerospike_fun_call_with_retry(
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
yield result[0][2]
except Exception as ex:
self.log.error(
f"Error in _iterkeys for table {self.table_name} exception {ex}"
f"FaustAerospikeException Error in _iterkeys "
f"for table {self.table_name} exception {ex}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _itervalues(self) -> Iterator[bytes]:
try:
scan: aerospike.Scan = self.client.scan(
namespace=self.namespace, set=self.table_name
fun = self.client.scan

scan: aerospike.Scan = self.aerospike_fun_call_with_retry(
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
(key, meta, bins) = result
Expand All @@ -177,19 +173,17 @@ def _itervalues(self) -> Iterator[bytes]:
yield None
except Exception as ex:
self.log.error(
f"Error in _itervalues for table {self.table_name} exception {ex}"
f"FaustAerospikeException Error "
f"in _itervalues for table {self.table_name}"
f" exception {ex}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
try:

scan: aerospike.Scan = self.client.scan(
namespace=self.namespace, set=self.table_name
fun = self.client.scan
scan: aerospike.Scan = self.aerospike_fun_call_with_retry(
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
(key_data, meta, bins) = result
Expand All @@ -200,12 +194,9 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
yield key, bins
except Exception as ex:
self.log.error(
f"Error in _iteritems for table {self.table_name} exception {ex}"
f"FaustAerospikeException Error in _iteritems "
f"for table {self.table_name} exception {ex}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _size(self) -> int:
Expand All @@ -215,7 +206,9 @@ def _contains(self, key: bytes) -> bool:
try:
if self.app.conf.store_check_exists:
key = (self.namespace, self.table_name, key)
(key, meta) = self.client.exists(key=key)
(key, meta) = self.aerospike_fun_call_with_retry(
fun=self.client.exists, key=key
)
if meta:
return True
else:
Expand All @@ -224,13 +217,10 @@ def _contains(self, key: bytes) -> bool:
return True
except Exception as ex:
self.log.error(
f"Error in _contains for table {self.table_name} exception "
f"FaustAerospikeException Error in _contains for table "
f"{self.table_name} exception "
f"{ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _clear(self) -> None:
Expand All @@ -245,3 +235,30 @@ def persisted_offset(self, tp: TP) -> Optional[int]:
This always returns :const:`None` when using the aerospike store.
"""
return None

def aerospike_fun_call_with_retry(self, fun, *args, **kwargs):
f_tries = self.app.conf.aerospike_retries_on_exception
f_delay = self.app.conf.aerospike_sleep_seconds_between_retries_on_exception
while f_tries > 1:
try:
return fun(*args, **kwargs)
except aerospike.exception.RecordNotFound as ex:
raise ex
except Exception:
time.sleep(f_delay)
f_tries -= 1
try:
return fun(*args, **kwargs)
except aerospike.exception.RecordNotFound as ex:
raise ex
except Exception as ex:
self.log.error(
f"FaustAerospikeException Error in aerospike "
f"operation for table {self.table_name} "
f"exception {ex} after retries"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex
27 changes: 27 additions & 0 deletions faust/types/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,33 @@ def crash_app_on_aerospike_exception(self) -> bool:
client has to catch the Error and implement a dead letter queue
"""

@sections.Stream.setting(
params.Int,
version_introduced="0.6.10",
env_name="AEROSPIKE_RETRIES_ON_EXCEPTION",
default=60,
)
def aerospike_retries_on_exception(self) -> bool:
"""Number of retries to aerospike on a runtime error from the aerospike client.
Set this to the number of retries using the aerospike client on a runtime
Exception thrown by the client
"""

@sections.Stream.setting(
params.Int,
version_introduced="0.6.10",
env_name="AEROSPIKE_SLEEP_SECONDS_BETWEEN_RETRIES_ON_EXCEPTION",
default=1,
)
def aerospike_sleep_seconds_between_retries_on_exception(self) -> bool:
"""Seconds to sleep between retries to aerospike on a runtime error from
the aerospike client.
Set this to the sleep in seconds between retries using the aerospike
client on a runtime Exception thrown by the client
"""

@sections.RPC.setting(
params.Bool,
env_name="APP_REPLY_CREATE_TOPIC",
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/stores/test_aerospike.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
import sys
from unittest.mock import MagicMock, patch

import pytest

import faust
from faust.stores.aerospike import AeroSpikeStore

try:
from aerospike.exception import RecordNotFound
except ImportError:

class RecordNotFound(Exception):
...

m1 = MagicMock()
m2 = MagicMock()
sys.modules["aerospike"] = m1
m1.exception = m2
m2.RecordNotFound = RecordNotFound


class TestAerospikeStore:
@pytest.fixture()
Expand Down Expand Up @@ -45,6 +59,7 @@ async def test_get_aerospike_client_instantiated(self, aero):

@pytest.fixture()
def store(self):
sys.modules["aerospike"] = MagicMock()
with patch("faust.stores.aerospike.aerospike", MagicMock()):
options = {}
options[AeroSpikeStore.HOSTS_KEY] = "localhost"
Expand All @@ -57,6 +72,9 @@ def store(self):
)
store.namespace = "test_ns"
store.client = MagicMock()
store.app.conf.aerospike_sleep_seconds_between_retries_on_exception = 0
store.app.conf.aerospike_retries_on_exception = 2
store.app.conf.crash_app_on_aerospike_exception = True
return store

def test_get_correct_value(self, store):
Expand Down

0 comments on commit 868d7a4

Please sign in to comment.