Closed
Description
The following code may ask for an item from the event_queue with a negative timeout interval.
class BatchEventProcessor(BaseEventProcessor):
def _run(self):
try:
while True:
if self._get_time() >= self.flushing_interval_deadline:
self._flush_batch()
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())
self.logger.debug('Flush interval deadline. Flushed batch.')
try:
interval = self.flushing_interval_deadline - self._get_time()
item = self.event_queue.get(True, interval) ## interval can be negative
If the flushing_interval_deadline is between the first call of _get_time() and the second call of _get_time() then interval will be negative.
Proposed fix:
class BatchEventProcessor(BaseEventProcessor):
def _run(self):
try:
while True:
loop_time = self._get_time() ## only call get_time once per loop iteration
if loop_time >= self.flushing_interval_deadline:
self._flush_batch()
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())
self.logger.debug('Flush interval deadline. Flushed batch.')
try:
interval = self.flushing_interval_deadline - loop_time
item = self.event_queue.get(True, interval)
Metadata
Metadata
Assignees
Labels
No labels