21
21
from optimizely import logger as _logging
22
22
from optimizely .event_dispatcher import EventDispatcher as default_event_dispatcher
23
23
from optimizely .helpers import validator
24
- from .user_event import UserEvent
25
24
from .event_factory import EventFactory
25
+ from .user_event import UserEvent
26
26
27
27
ABC = abc .ABCMeta ('ABC' , (object ,), {'__slots__' : ()})
28
28
@@ -32,12 +32,16 @@ class BaseEventProcessor(ABC):
32
32
33
33
@abc .abstractmethod
34
34
def process (user_event ):
35
+ """ Method to provide intermediary processing stage within event production.
36
+ Args:
37
+ user_event: UserEvent instance that needs to be processed and dispatched.
38
+ """
35
39
pass
36
40
37
41
38
42
class BatchEventProcessor (BaseEventProcessor ):
39
43
"""
40
- BatchEventProcessor is a batched implementation of the BaseEventProcessor.
44
+ BatchEventProcessor is an implementation of the BaseEventProcessor that batches events .
41
45
The BatchEventProcessor maintains a single consumer thread that pulls events off of
42
46
the blocking queue and buffers them for either a configured batch size or for a
43
47
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
@@ -84,16 +88,15 @@ def __init__(self,
84
88
self .timeout_interval = timedelta (seconds = timeout_interval ) \
85
89
if self ._validate_intantiation_props (timeout_interval , 'timeout_interval' ) \
86
90
else self ._DEFAULT_TIMEOUT_INTERVAL
87
- self ._is_started = False
88
91
self ._current_batch = list ()
89
92
90
93
if start_on_init is True :
91
94
self .start ()
92
95
93
96
@property
94
- def is_started (self ):
97
+ def is_running (self ):
95
98
""" Property to check if consumer thread is alive or not. """
96
- return self ._is_started
99
+ return self .executor . isAlive ()
97
100
98
101
def _validate_intantiation_props (self , prop , prop_name ):
99
102
""" Method to determine if instantiation properties like batch_size, flush_interval
@@ -131,17 +134,15 @@ def _get_time(self, _time=None):
131
134
132
135
def start (self ):
133
136
""" Starts the batch processing thread to batch events. """
134
- if self . is_started :
135
- self .logger .warning ('Service already started' )
137
+ if hasattr ( self , 'executor' ) and self . is_running :
138
+ self .logger .warning ('BatchEventProcessor already started. ' )
136
139
return
137
140
138
141
self .flushing_interval_deadline = self ._get_time () + self ._get_time (self .flush_interval .total_seconds ())
139
142
self .executor = threading .Thread (target = self ._run )
140
143
self .executor .setDaemon (True )
141
144
self .executor .start ()
142
145
143
- self ._is_started = True
144
-
145
146
def _run (self ):
146
147
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps
147
148
periodically if queue is empty.
@@ -200,6 +201,10 @@ def _flush_queue(self):
200
201
self .logger .error ('Error dispatching event: ' + str (log_event ) + ' ' + str (e ))
201
202
202
203
def process (self , user_event ):
204
+ """ Method to process the user_event by putting it in event_queue.
205
+ Args:
206
+ user_event: UserEvent Instance.
207
+ """
203
208
if not isinstance (user_event , UserEvent ):
204
209
self .logger .error ('Provided event is in an invalid format.' )
205
210
return
@@ -243,12 +248,10 @@ def _should_split(self, user_event):
243
248
244
249
def stop (self ):
245
250
""" Stops and disposes batch event processor. """
246
-
247
251
self .event_queue .put (self ._SHUTDOWN_SIGNAL )
252
+ self .logger .warning ('Stopping Scheduler.' )
253
+
248
254
self .executor .join (self .timeout_interval .total_seconds ())
249
255
250
- if self .executor . isAlive () :
256
+ if self .is_running :
251
257
self .logger .error ('Timeout exceeded while attempting to close for ' + str (self .timeout_interval ) + ' ms.' )
252
-
253
- self .logger .warning ('Stopping Scheduler.' )
254
- self ._is_started = False
0 commit comments