Skip to content

Commit

Permalink
Fix for #126 (#127)
Browse files Browse the repository at this point in the history
* Fix for #126

* fix linting
  • Loading branch information
patkivikram authored Apr 6, 2021
1 parent dc58900 commit 159ad62
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
45 changes: 39 additions & 6 deletions faust/stores/aerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ def _get(self, key: bytes) -> Optional[bytes]:
self.log.debug(f"key not found {key} exception {ex}")
raise KeyError(f"key not found {key}")
except Exception as ex:
self.log.error(f"Error in set for table {self.table_name} exception {ex}")
self.log.error(
f"Error in set for table {self.table_name} exception {ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _set(self, key: bytes, value: Optional[bytes]) -> None:
Expand All @@ -114,21 +120,31 @@ def _set(self, key: bytes, value: Optional[bytes]) -> None:
},
)
except Exception as ex:
self.log.error(f"Error in set for table {self.table_name} exception {ex}")
self.log.error(
f"Error in set for table {self.table_name} exception {ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _del(self, key: bytes) -> None:
try:
key = (self.namespace, self.table_name, key)
self.client.remove(key=key)
except aerospike.exception.RecordNotFound as ex:
self.log.error(
f"Error in delete for table {self.table_name} exception {ex}"
self.log.warning(
f"Error in delete for table {self.table_name} exception {ex} key {key}"
)
except Exception as ex:
self.log.error(
f"Error in delete for table {self.table_name} exception {ex}"
f"Error in delete for table {self.table_name} exception {ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _iterkeys(self) -> Iterator[bytes]:
Expand All @@ -142,6 +158,10 @@ def _iterkeys(self) -> Iterator[bytes]:
self.log.error(
f"Error in _iterkeys for table {self.table_name} exception {ex}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _itervalues(self) -> Iterator[bytes]:
Expand All @@ -159,6 +179,10 @@ def _itervalues(self) -> Iterator[bytes]:
self.log.error(
f"Error in _itervalues for table {self.table_name} exception {ex}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
Expand All @@ -178,6 +202,10 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
self.log.error(
f"Error in _iteritems for table {self.table_name} exception {ex}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _size(self) -> int:
Expand All @@ -196,8 +224,13 @@ def _contains(self, key: bytes) -> bool:
return True
except Exception as ex:
self.log.error(
f"Error in _contains for table {self.table_name} exception {ex}"
f"Error in _contains for table {self.table_name} exception "
f"{ex} key {key}"
)
if self.app.conf.crash_app_on_aerospike_exception:
self.app._crash(
ex
) # crash the app to prevent the offset from progressing
raise ex

def _clear(self) -> None:
Expand Down
13 changes: 13 additions & 0 deletions faust/types/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,19 @@ def store_check_exists(self) -> bool:
client has to catch KeyError
"""

@sections.Stream.setting(
params.Bool,
version_introduced="0.6.3",
env_name="CRASH_APP_ON_AEROSPIKE_EXCEPTION",
default=True,
)
def crash_app_on_aerospike_exception(self) -> bool:
"""Crashes the app on an aerospike Exceptions.
If True, crashes the app and prevents the commit offset on progressing. If False
client has to catch the Error and implement a dead letter queue
"""

@sections.RPC.setting(
params.Bool,
env_name="APP_REPLY_CREATE_TOPIC",
Expand Down

0 comments on commit 159ad62

Please sign in to comment.