diff --git a/faust/stores/aerospike.py b/faust/stores/aerospike.py index b3df8e820..de78e0995 100644 --- a/faust/stores/aerospike.py +++ b/faust/stores/aerospike.py @@ -97,7 +97,13 @@ def _get(self, key: bytes) -> Optional[bytes]: self.log.debug(f"key not found {key} exception {ex}") raise KeyError(f"key not found {key}") except Exception as ex: - self.log.error(f"Error in set for table {self.table_name} exception {ex}") + self.log.error( + f"Error in set for table {self.table_name} exception {ex} key {key}" + ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _set(self, key: bytes, value: Optional[bytes]) -> None: @@ -114,7 +120,13 @@ def _set(self, key: bytes, value: Optional[bytes]) -> None: }, ) except Exception as ex: - self.log.error(f"Error in set for table {self.table_name} exception {ex}") + self.log.error( + f"Error in set for table {self.table_name} exception {ex} key {key}" + ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _del(self, key: bytes) -> None: @@ -122,13 +134,17 @@ def _del(self, key: bytes) -> None: key = (self.namespace, self.table_name, key) self.client.remove(key=key) except aerospike.exception.RecordNotFound as ex: - self.log.error( - f"Error in delete for table {self.table_name} exception {ex}" + self.log.warning( + f"Error in delete for table {self.table_name} exception {ex} key {key}" ) except Exception as ex: self.log.error( - f"Error in delete for table {self.table_name} exception {ex}" + f"Error in delete for table {self.table_name} exception {ex} key {key}" ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _iterkeys(self) -> Iterator[bytes]: @@ -142,6 +158,10 @@ def _iterkeys(self) -> Iterator[bytes]: self.log.error( f"Error in _iterkeys for table {self.table_name} exception {ex}" ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _itervalues(self) -> Iterator[bytes]: @@ -159,6 +179,10 @@ def _itervalues(self) -> Iterator[bytes]: self.log.error( f"Error in _itervalues for table {self.table_name} exception {ex}" ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]: @@ -178,6 +202,10 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]: self.log.error( f"Error in _iteritems for table {self.table_name} exception {ex}" ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _size(self) -> int: @@ -196,8 +224,13 @@ def _contains(self, key: bytes) -> bool: return True except Exception as ex: self.log.error( - f"Error in _contains for table {self.table_name} exception {ex}" + f"Error in _contains for table {self.table_name} exception " + f"{ex} key {key}" ) + if self.app.conf.crash_app_on_aerospike_exception: + self.app._crash( + ex + ) # crash the app to prevent the offset from progressing raise ex def _clear(self) -> None: diff --git a/faust/types/settings/settings.py b/faust/types/settings/settings.py index 8a4f3820a..6aedec20f 100644 --- a/faust/types/settings/settings.py +++ b/faust/types/settings/settings.py @@ -1367,6 +1367,19 @@ def store_check_exists(self) -> bool: client has to catch KeyError """ + @sections.Stream.setting( + params.Bool, + version_introduced="0.6.3", + env_name="CRASH_APP_ON_AEROSPIKE_EXCEPTION", + default=True, + ) + def crash_app_on_aerospike_exception(self) -> bool: + """Crashes the app on an aerospike Exceptions. + + If True, crashes the app and prevents the commit offset on progressing. If False + client has to catch the Error and implement a dead letter queue + """ + @sections.RPC.setting( params.Bool, env_name="APP_REPLY_CREATE_TOPIC",