Skip to content

Commit ee08eb8

Browse files
rename flush_queue to flush_batch and update debug messages. fix one bug where current_batch was being reset without a lock.
1 parent 027f277 commit ee08eb8

File tree

2 files changed

+9
-10
lines changed

2 files changed

+9
-10
lines changed

optimizely/event/event_processor.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ def _run(self):
181181
try:
182182
while True:
183183
if self._get_time() >= self.flushing_interval_deadline:
184-
self._flush_queue()
184+
self._flush_batch()
185185
self.flushing_interval_deadline = self._get_time() + \
186186
self._get_time(self.flush_interval.total_seconds())
187-
self.logger.debug('Flush interval deadline. Flushed queue.')
187+
self.logger.debug('Flush interval deadline. Flushed batch.')
188188

189189
try:
190190
interval = self.flushing_interval_deadline - self._get_time()
@@ -202,7 +202,7 @@ def _run(self):
202202

203203
if item == self._FLUSH_SIGNAL:
204204
self.logger.debug('Received flush signal.')
205-
self._flush_queue()
205+
self._flush_batch()
206206
continue
207207

208208
if isinstance(item, UserEvent):
@@ -213,14 +213,14 @@ def _run(self):
213213

214214
finally:
215215
self.logger.info('Exiting processing loop. Attempting to flush pending events.')
216-
self._flush_queue()
216+
self._flush_batch()
217217

218218
def flush(self):
219219
""" Adds flush signal to event_queue. """
220220

221221
self.event_queue.put(self._FLUSH_SIGNAL)
222222

223-
def _flush_queue(self):
223+
def _flush_batch(self):
224224
""" Flushes event_queue by dispatching events. """
225225
batch_len = len(self._current_batch)
226226
if batch_len == 0:
@@ -270,9 +270,8 @@ def _add_to_batch(self, user_event):
270270
user_event: UserEvent Instance.
271271
"""
272272
if self._should_split(user_event):
273-
self.logger.debug('Flush on split.')
274-
self._flush_queue()
275-
self._current_batch = list()
273+
self.logger.debug('Flushing batch on split.')
274+
self._flush_batch()
276275

277276
# Reset the deadline if starting a new batch.
278277
if len(self._current_batch) == 0:
@@ -282,7 +281,7 @@ def _add_to_batch(self, user_event):
282281
self._current_batch.append(user_event)
283282
if len(self._current_batch) >= self.batch_size:
284283
self.logger.debug('Flushing on batch size.')
285-
self._flush_queue()
284+
self._flush_batch()
286285

287286
def _should_split(self, user_event):
288287
""" Method to check if current event batch should split into two.

tests/test_event_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ def test_flush_once_max_timeout(self):
191191
self.assertEqual(0, self.event_processor.event_queue.qsize())
192192
self.assertTrue(mock_config_logging.debug.called)
193193
mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.')
194-
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.')
195194
mock_config_logging.debug.assert_any_call('Flushing batch size 1')
195+
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed batch.')
196196
self.assertTrue(mock_config_logging.debug.call_count == 3)
197197
self.optimizely.logger = SimpleLogger()
198198

0 commit comments

Comments
 (0)