Skip to content

Commit 62c00f3

Browse files
(fix): add more debug logging. remove extra current_batch set (#228)
* add more debug logging * take out in seconds. already in seconds * Revert "take out in seconds. already in seconds" This reverts commit 9c0cca8. * update logging to log when batch is empty on flush or flush of batch size * use mock logger * rename flush_queue to flush_batch and update debug messages. fix one bug where current_batch was being reset without a lock. * cleanup incorrect comments
1 parent b156e21 commit 62c00f3

File tree

2 files changed

+20
-15
lines changed

2 files changed

+20
-15
lines changed

optimizely/event/event_processor.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,16 @@ def start(self):
175175
self.executor.start()
176176

177177
def _run(self):
178-
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps
179-
periodically if queue is empty.
178+
""" Triggered as part of the thread which batches events or flushes event_queue and hangs on get
179+
for flush interval if queue is empty.
180180
"""
181181
try:
182182
while True:
183183
if self._get_time() >= self.flushing_interval_deadline:
184-
self._flush_queue()
184+
self._flush_batch()
185185
self.flushing_interval_deadline = self._get_time() + \
186186
self._get_time(self.flush_interval.total_seconds())
187-
self.logger.debug('Flush interval deadline. Flushed queue.')
187+
self.logger.debug('Flush interval deadline. Flushed batch.')
188188

189189
try:
190190
interval = self.flushing_interval_deadline - self._get_time()
@@ -202,7 +202,7 @@ def _run(self):
202202

203203
if item == self._FLUSH_SIGNAL:
204204
self.logger.debug('Received flush signal.')
205-
self._flush_queue()
205+
self._flush_batch()
206206
continue
207207

208208
if isinstance(item, UserEvent):
@@ -213,19 +213,22 @@ def _run(self):
213213

214214
finally:
215215
self.logger.info('Exiting processing loop. Attempting to flush pending events.')
216-
self._flush_queue()
216+
self._flush_batch()
217217

218218
def flush(self):
219219
""" Adds flush signal to event_queue. """
220220

221221
self.event_queue.put(self._FLUSH_SIGNAL)
222222

223-
def _flush_queue(self):
224-
""" Flushes event_queue by dispatching events. """
225-
226-
if len(self._current_batch) == 0:
223+
def _flush_batch(self):
224+
""" Flushes current batch by dispatching event. """
225+
batch_len = len(self._current_batch)
226+
if batch_len == 0:
227+
self.logger.debug('Nothing to flush.')
227228
return
228229

230+
self.logger.debug('Flushing batch size ' + str(batch_len))
231+
229232
with self.LOCK:
230233
to_process_batch = list(self._current_batch)
231234
self._current_batch = list()
@@ -267,8 +270,8 @@ def _add_to_batch(self, user_event):
267270
user_event: UserEvent Instance.
268271
"""
269272
if self._should_split(user_event):
270-
self._flush_queue()
271-
self._current_batch = list()
273+
self.logger.debug('Flushing batch on split.')
274+
self._flush_batch()
272275

273276
# Reset the deadline if starting a new batch.
274277
if len(self._current_batch) == 0:
@@ -277,7 +280,8 @@ def _add_to_batch(self, user_event):
277280
with self.LOCK:
278281
self._current_batch.append(user_event)
279282
if len(self._current_batch) >= self.batch_size:
280-
self._flush_queue()
283+
self.logger.debug('Flushing on batch size.')
284+
self._flush_batch()
281285

282286
def _should_split(self, user_event):
283287
""" Method to check if current event batch should split into two.

tests/test_event_processor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ def test_flush_once_max_timeout(self):
191191
self.assertEqual(0, self.event_processor.event_queue.qsize())
192192
self.assertTrue(mock_config_logging.debug.called)
193193
mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.')
194-
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.')
195-
self.assertTrue(mock_config_logging.debug.call_count == 2)
194+
mock_config_logging.debug.assert_any_call('Flushing batch size 1')
195+
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed batch.')
196+
self.assertTrue(mock_config_logging.debug.call_count == 3)
196197
self.optimizely.logger = SimpleLogger()
197198

198199
def test_flush_max_batch_size(self):

0 commit comments

Comments
 (0)