-
Notifications
You must be signed in to change notification settings - Fork 37
feat(event_processor): add forwarding event processor and integrate with optimizely #205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 112 commits
232f0a0
2320939
fbb5a68
c162465
633b095
8a5f531
35fcd82
d7201cb
c829fca
1d858b7
c3c9d46
a2eeb85
a045291
f70a1c9
faff070
c931956
2dea7fe
d7f69ec
8853cb3
5934764
23ab6ce
6474e98
1184568
592a306
c4a412a
2a3374c
ce44c11
00efd1b
54333d7
af497f9
d2f7126
f21c94b
3fd5788
ebe833c
ddef208
e964048
8953a43
b1388da
535e624
d4f2c66
80b0963
071460a
900d96d
8cd8547
b730bfd
752e35f
f9a2cb6
54446b9
aedccc8
e628e0f
855c73f
c92f79d
63c1b21
c18e359
56408dc
f851754
8f7be2b
1486fa4
b9d5fd9
d42d14b
6366d59
65a752b
ab192a0
06ac3f0
8e4c118
641d09a
9630139
3e40044
8998580
a89a5bf
2582fb7
41d44e6
cfb2d74
75725bd
42a90f0
751ee64
cb07348
c3d5b07
2d37cbe
f50e053
bb3f738
b5d461b
ca208a6
2c33344
46cc51d
1ab8b32
67e0424
f0dd1fd
4d4e7d1
477bbd5
e299dce
758ffc2
304ac00
cbd918a
5ef24de
1edb7c2
4c8b7f3
9d917a4
443b7aa
3b2cb4a
dcca0ec
dea337b
1d5aeca
b3883d6
655a859
4753c0b
86e4b7f
65fb6fa
01a8ea1
3baf024
7e20be7
15e0c8f
498db55
614a3d4
ee89a1f
2c67253
6e4d431
02c9e53
9ebf1a4
34e3cf5
d241a2f
dc53d39
8956cba
c6a2c18
41516f0
ac8663a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,312 @@ | ||
# Copyright 2019 Optimizely | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import abc | ||
import threading | ||
import time | ||
|
||
from datetime import timedelta | ||
from six.moves import queue | ||
|
||
from optimizely import logger as _logging | ||
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher | ||
from optimizely.helpers import enums | ||
from optimizely.helpers import validator | ||
from .event_factory import EventFactory | ||
from .user_event import UserEvent | ||
|
||
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) | ||
|
||
|
||
class BaseEventProcessor(ABC): | ||
""" Class encapsulating event processing. Override with your own implementation. """ | ||
|
||
@abc.abstractmethod | ||
def process(user_event): | ||
""" Method to provide intermediary processing stage within event production. | ||
Args: | ||
user_event: UserEvent instance that needs to be processed and dispatched. | ||
""" | ||
pass | ||
|
||
|
||
class BatchEventProcessor(BaseEventProcessor): | ||
""" | ||
BatchEventProcessor is an implementation of the BaseEventProcessor that batches events. | ||
The BatchEventProcessor maintains a single consumer thread that pulls events off of | ||
the blocking queue and buffers them for either a configured batch size or for a | ||
maximum duration before the resulting LogEvent is sent to the EventDispatcher. | ||
""" | ||
|
||
_DEFAULT_QUEUE_CAPACITY = 1000 | ||
_DEFAULT_BATCH_SIZE = 10 | ||
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30) | ||
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5) | ||
_SHUTDOWN_SIGNAL = object() | ||
_FLUSH_SIGNAL = object() | ||
LOCK = threading.Lock() | ||
|
||
def __init__(self, | ||
event_dispatcher, | ||
logger, | ||
start_on_init=False, | ||
event_queue=None, | ||
batch_size=None, | ||
flush_interval=None, | ||
timeout_interval=None, | ||
notification_center=None): | ||
""" EventProcessor init method to configure event batching. | ||
|
||
Args: | ||
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. | ||
logger: Provides a log method to log messages. By default nothing would be logged. | ||
start_on_init: Optional boolean param which starts the consumer thread if set to True. | ||
Default value is False. | ||
event_queue: Optional component which accumulates the events until dispacthed. | ||
batch_size: Optional param which defines the upper limit on the number of events in event_queue after which | ||
the event_queue will be flushed. | ||
flush_interval: Optional floating point number representing time interval in seconds after which event_queue will | ||
be flushed. | ||
timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer | ||
thread. | ||
notification_center: Optional instance of notification_center.NotificationCenter. | ||
""" | ||
self.event_dispatcher = event_dispatcher or default_event_dispatcher | ||
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) | ||
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) | ||
self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \ | ||
else self._DEFAULT_BATCH_SIZE | ||
self.flush_interval = timedelta(seconds=flush_interval) \ | ||
if self._validate_intantiation_props(flush_interval, 'flush_interval') \ | ||
else self._DEFAULT_FLUSH_INTERVAL | ||
self.timeout_interval = timedelta(seconds=timeout_interval) \ | ||
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ | ||
else self._DEFAULT_TIMEOUT_INTERVAL | ||
self.notification_center = notification_center | ||
self._current_batch = list() | ||
|
||
if start_on_init is True: | ||
self.start() | ||
|
||
@property | ||
def is_running(self): | ||
""" Property to check if consumer thread is alive or not. """ | ||
return self.executor.isAlive() | ||
|
||
def _validate_intantiation_props(self, prop, prop_name): | ||
""" Method to determine if instantiation properties like batch_size, flush_interval | ||
and timeout_interval are valid. | ||
|
||
Args: | ||
prop: Property value that needs to be validated. | ||
prop_name: Property name. | ||
|
||
Returns: | ||
False if property value is None or less than 1 or not a finite number. | ||
False if property name is batch_size and value is a floating point number. | ||
True otherwise. | ||
""" | ||
if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \ | ||
not validator.is_finite_number(prop): | ||
self.logger.info('Using default value for {}.'.format(prop_name)) | ||
return False | ||
|
||
return True | ||
|
||
def _get_time(self, _time=None): | ||
""" Method to return rounded off time as integer in seconds. If _time is None, uses current time. | ||
|
||
Args: | ||
_time: time in seconds that needs to be rounded off. | ||
|
||
Returns: | ||
Integer time in seconds. | ||
""" | ||
if _time is None: | ||
return int(round(time.time())) | ||
|
||
return int(round(_time)) | ||
|
||
def start(self): | ||
""" Starts the batch processing thread to batch events. """ | ||
if hasattr(self, 'executor') and self.is_running: | ||
self.logger.warning('BatchEventProcessor already started.') | ||
return | ||
|
||
self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) | ||
self.executor = threading.Thread(target=self._run) | ||
self.executor.setDaemon(True) | ||
self.executor.start() | ||
|
||
def _run(self): | ||
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps | ||
periodically if queue is empty. | ||
""" | ||
try: | ||
while True: | ||
if self._get_time() > self.flushing_interval_deadline: | ||
self._flush_queue() | ||
|
||
try: | ||
item = self.event_queue.get(True, 0.05) | ||
|
||
except queue.Empty: | ||
time.sleep(0.05) | ||
continue | ||
|
||
if item == self._SHUTDOWN_SIGNAL: | ||
self.logger.debug('Received shutdown signal.') | ||
break | ||
|
||
if item == self._FLUSH_SIGNAL: | ||
self.logger.debug('Received flush signal.') | ||
self._flush_queue() | ||
continue | ||
|
||
if isinstance(item, UserEvent): | ||
self._add_to_batch(item) | ||
|
||
except Exception as exception: | ||
self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) | ||
|
||
finally: | ||
self.logger.info('Exiting processing loop. Attempting to flush pending events.') | ||
self._flush_queue() | ||
|
||
def flush(self): | ||
""" Adds flush signal to event_queue. """ | ||
|
||
self.event_queue.put(self._FLUSH_SIGNAL) | ||
|
||
def _flush_queue(self): | ||
""" Flushes event_queue by dispatching events. """ | ||
|
||
if len(self._current_batch) == 0: | ||
return | ||
|
||
with self.LOCK: | ||
to_process_batch = list(self._current_batch) | ||
self._current_batch = list() | ||
|
||
log_event = EventFactory.create_log_event(to_process_batch, self.logger) | ||
|
||
if self.notification_center is not None: | ||
self.notification_center.send_notifications( | ||
enums.NotificationTypes.LOG_EVENT, | ||
log_event | ||
) | ||
|
||
try: | ||
self.event_dispatcher.dispatch_event(log_event) | ||
except Exception as e: | ||
self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) | ||
|
||
def process(self, user_event): | ||
""" Method to process the user_event by putting it in event_queue. | ||
Args: | ||
user_event: UserEvent Instance. | ||
""" | ||
if not isinstance(user_event, UserEvent): | ||
self.logger.error('Provided event is in an invalid format.') | ||
return | ||
|
||
self.logger.debug('Received user_event: ' + str(user_event)) | ||
|
||
try: | ||
self.event_queue.put_nowait(user_event) | ||
except queue.Full: | ||
self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize()))) | ||
|
||
def _add_to_batch(self, user_event): | ||
if self._should_split(user_event): | ||
self._flush_queue() | ||
self._current_batch = list() | ||
|
||
# Reset the deadline if starting a new batch. | ||
if len(self._current_batch) == 0: | ||
self.flushing_interval_deadline = self._get_time() + \ | ||
self._get_time(self.flush_interval.total_seconds()) | ||
|
||
with self.LOCK: | ||
self._current_batch.append(user_event) | ||
if len(self._current_batch) >= self.batch_size: | ||
self._flush_queue() | ||
|
||
def _should_split(self, user_event): | ||
if len(self._current_batch) == 0: | ||
return False | ||
|
||
current_context = self._current_batch[-1].event_context | ||
new_context = user_event.event_context | ||
|
||
if current_context.revision != new_context.revision: | ||
return True | ||
|
||
if current_context.project_id != new_context.project_id: | ||
return True | ||
|
||
return False | ||
|
||
def stop(self): | ||
""" Stops and disposes batch event processor. """ | ||
self.event_queue.put(self._SHUTDOWN_SIGNAL) | ||
self.logger.warning('Stopping Scheduler.') | ||
|
||
self.executor.join(self.timeout_interval.total_seconds()) | ||
|
||
if self.is_running: | ||
self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') | ||
|
||
|
||
class ForwardingEventProcessor(BaseEventProcessor): | ||
""" | ||
ForwardingEventProcessor serves as the default EventProcessor. | ||
|
||
The ForwardingEventProcessor sends the LogEvent to EventDispatcher as soon as it is received. | ||
""" | ||
|
||
def __init__(self, event_dispatcher, logger, notification_center=None): | ||
""" ForwardingEventProcessor init method to configure event dispatching. | ||
|
||
Args: | ||
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. | ||
logger: Optional component which provides a log method to log messages. By default nothing would be logged. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You say it is optional, but it is not so in practice. |
||
notification_center: Optional instance of notification_center.NotificationCenter. | ||
""" | ||
self.event_dispatcher = event_dispatcher | ||
self.logger = logger | ||
self.notification_center = notification_center | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to feedback in #204 shouldn't we validate that |
||
|
||
def process(self, user_event): | ||
""" Method to process the user_event by dispatching it. | ||
Args: | ||
user_event: UserEvent Instance. | ||
""" | ||
if not isinstance(user_event, UserEvent): | ||
self.logger.error('Provided event is in an invalid format.') | ||
return | ||
|
||
self.logger.debug('Received user_event: ' + str(user_event)) | ||
|
||
log_event = EventFactory.create_log_event(user_event, self.logger) | ||
|
||
if self.notification_center is not None: | ||
self.notification_center.send_notifications( | ||
enums.NotificationTypes.LOG_EVENT, | ||
log_event | ||
) | ||
|
||
try: | ||
self.event_dispatcher.dispatch_event(log_event) | ||
except Exception as e: | ||
self.logger.exception('Error dispatching event: ' + str(log_event) + ' ' + str(e)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,7 @@ class InvalidGroupException(Exception): | |
|
||
|
||
class InvalidInputException(Exception): | ||
""" Raised when provided datafile, event dispatcher, logger or error handler is invalid. """ | ||
""" Raised when provided datafile, event dispatcher, logger, event processor or error handler is invalid. """ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit. Year in header needs to be updated. |
||
pass | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger
should be optional and we should set it toNoOpLogger
if not provided.