Skip to content

BatchEventProcessor can try to deque event with negative timeout. #354

Closed
@jkolenofferup

Description

@jkolenofferup

The following code may ask for an item from the event_queue with a negative timeout interval.

class BatchEventProcessor(BaseEventProcessor):
    def _run(self):
        try:
            while True:
                if self._get_time() >= self.flushing_interval_deadline:
                    self._flush_batch()
                    self.flushing_interval_deadline = self._get_time() + \
                        self._get_time(self.flush_interval.total_seconds())
                    self.logger.debug('Flush interval deadline. Flushed batch.')
                try:
                    interval = self.flushing_interval_deadline - self._get_time()
                    item = self.event_queue.get(True, interval) ## interval can be negative

If the flushing_interval_deadline is between the first call of _get_time() and the second call of _get_time() then interval will be negative.

Proposed fix:

class BatchEventProcessor(BaseEventProcessor):
    def _run(self):
        try:
            while True:
                loop_time = self._get_time()  ## only call get_time once per loop iteration
                if loop_time >= self.flushing_interval_deadline:
                    self._flush_batch()
                    self.flushing_interval_deadline = self._get_time() + \
                        self._get_time(self.flush_interval.total_seconds())
                    self.logger.debug('Flush interval deadline. Flushed batch.')
                try:
                    interval = self.flushing_interval_deadline - loop_time
                    item = self.event_queue.get(True, interval) 

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions