Skip to content

Commit f57d5bc

Browse files
rashidspMichael Ng
authored andcommitted
feat(event_processor): add forwarding event processor and integrate with optimizely (#205)
1 parent 22437a7 commit f57d5bc

File tree

7 files changed

+456
-230
lines changed

7 files changed

+456
-230
lines changed

optimizely/event/event_processor.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ def _validate_intantiation_props(self, prop, prop_name):
117117
prop_name: Property name.
118118
119119
Returns:
120-
False if property value is None or less than 1 or not a finite number.
120+
False if property value is None or less than or equal to 0 or not a finite number.
121121
False if property name is batch_size and value is a floating point number.
122122
True otherwise.
123123
"""
124-
if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \
124+
if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop <= 0 or \
125125
not validator.is_finite_number(prop):
126126
self.logger.info('Using default value for {}.'.format(prop_name))
127127
return False
@@ -159,11 +159,11 @@ def _run(self):
159159
"""
160160
try:
161161
while True:
162-
if self._get_time() > self.flushing_interval_deadline:
162+
if self._get_time() >= self.flushing_interval_deadline:
163163
self._flush_queue()
164164

165165
try:
166-
item = self.event_queue.get(True, 0.05)
166+
item = self.event_queue.get(False)
167167

168168
except queue.Empty:
169169
time.sleep(0.05)
@@ -283,3 +283,51 @@ def stop(self):
283283

284284
if self.is_running:
285285
self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.')
286+
287+
288+
class ForwardingEventProcessor(BaseEventProcessor):
289+
"""
290+
ForwardingEventProcessor serves as the default EventProcessor.
291+
292+
The ForwardingEventProcessor sends the LogEvent to EventDispatcher as soon as it is received.
293+
"""
294+
295+
def __init__(self, event_dispatcher, logger=None, notification_center=None):
296+
""" ForwardingEventProcessor init method to configure event dispatching.
297+
298+
Args:
299+
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
300+
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
301+
notification_center: Optional instance of notification_center.NotificationCenter.
302+
"""
303+
self.event_dispatcher = event_dispatcher
304+
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
305+
self.notification_center = notification_center
306+
307+
if not validator.is_notification_center_valid(self.notification_center):
308+
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
309+
self.notification_center = _notification_center.NotificationCenter()
310+
311+
def process(self, user_event):
312+
""" Method to process the user_event by dispatching it.
313+
Args:
314+
user_event: UserEvent Instance.
315+
"""
316+
if not isinstance(user_event, UserEvent):
317+
self.logger.error('Provided event is in an invalid format.')
318+
return
319+
320+
self.logger.debug('Received user_event: ' + str(user_event))
321+
322+
log_event = EventFactory.create_log_event(user_event, self.logger)
323+
324+
if self.notification_center is not None:
325+
self.notification_center.send_notifications(
326+
enums.NotificationTypes.LOG_EVENT,
327+
log_event
328+
)
329+
330+
try:
331+
self.event_dispatcher.dispatch_event(log_event)
332+
except Exception as e:
333+
self.logger.exception('Error dispatching event: ' + str(log_event) + ' ' + str(e))

optimizely/exceptions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2018, Optimizely
1+
# Copyright 2016-2019, Optimizely
22
# Licensed under the Apache License, Version 2.0 (the "License");
33
# you may not use this file except in compliance with the License.
44
# You may obtain a copy of the License at
@@ -43,7 +43,7 @@ class InvalidGroupException(Exception):
4343

4444

4545
class InvalidInputException(Exception):
46-
""" Raised when provided datafile, event dispatcher, logger or error handler is invalid. """
46+
""" Raised when provided datafile, event dispatcher, logger, event processor or error handler is invalid. """
4747
pass
4848

4949

optimizely/helpers/validator.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2018, Optimizely
1+
# Copyright 2016-2019, Optimizely
22
# Licensed under the Apache License, Version 2.0 (the "License");
33
# you may not use this file except in compliance with the License.
44
# You may obtain a copy of the License at
@@ -72,6 +72,19 @@ def is_config_manager_valid(config_manager):
7272
return _has_method(config_manager, 'get_config')
7373

7474

75+
def is_event_processor_valid(event_processor):
76+
""" Given an event_processor, determine if it is valid or not i.e. provides a process method.
77+
78+
Args:
79+
event_processor: Provides a process method to create user events and then send requests.
80+
81+
Returns:
82+
Boolean depending upon whether event_processor is valid or not.
83+
"""
84+
85+
return _has_method(event_processor, 'process')
86+
87+
7588
def is_error_handler_valid(error_handler):
7689
""" Given a error_handler determine if it is valid or not i.e. provides a handle_error method.
7790

optimizely/optimizely.py

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
from . import event_builder
1818
from . import exceptions
1919
from . import logger as _logging
20-
from .config_manager import StaticConfigManager
2120
from .config_manager import PollingConfigManager
21+
from .config_manager import StaticConfigManager
2222
from .error_handler import NoOpErrorHandler as noop_error_handler
23+
from .event import event_factory, user_event_factory
24+
from .event.event_processor import ForwardingEventProcessor
2325
from .event_dispatcher import EventDispatcher as default_event_dispatcher
24-
from .helpers import enums
25-
from .helpers import validator
26+
from .helpers import enums, validator
2627
from .notification_center import NotificationCenter
2728

2829

@@ -38,7 +39,8 @@ def __init__(self,
3839
user_profile_service=None,
3940
sdk_key=None,
4041
config_manager=None,
41-
notification_center=None):
42+
notification_center=None,
43+
event_processor=None):
4244
""" Optimizely init method for managing Custom projects.
4345
4446
Args:
@@ -56,6 +58,7 @@ def __init__(self,
5658
notification_center: Optional instance of notification_center.NotificationCenter. Useful when providing own
5759
config_manager.BaseConfigManager implementation which can be using the
5860
same NotificationCenter instance.
61+
event_processor: Processes the given event(s) by creating LogEvent(s) and then dispatching it.
5962
"""
6063
self.logger_name = '.'.join([__name__, self.__class__.__name__])
6164
self.is_valid = True
@@ -64,6 +67,9 @@ def __init__(self,
6467
self.error_handler = error_handler or noop_error_handler
6568
self.config_manager = config_manager
6669
self.notification_center = notification_center or NotificationCenter(self.logger)
70+
self.event_processor = event_processor or ForwardingEventProcessor(self.event_dispatcher,
71+
self.logger,
72+
self.notification_center)
6773

6874
try:
6975
self._validate_instantiation_options()
@@ -114,6 +120,9 @@ def _validate_instantiation_options(self):
114120
if not validator.is_notification_center_valid(self.notification_center):
115121
raise exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('notification_center'))
116122

123+
if not validator.is_event_processor_valid(self.event_processor):
124+
raise exceptions.InvalidInputException(enums.Errors.INVALID_INPUT.format('event_processor'))
125+
117126
def _validate_user_inputs(self, attributes=None, event_tags=None):
118127
""" Helper method to validate user inputs.
119128
@@ -149,26 +158,23 @@ def _send_impression_event(self, project_config, experiment, variation, user_id,
149158
attributes: Dict representing user attributes and values which need to be recorded.
150159
"""
151160

152-
impression_event = self.event_builder.create_impression_event(
161+
user_event = user_event_factory.UserEventFactory.create_impression_event(
153162
project_config,
154163
experiment,
155164
variation.id,
156165
user_id,
157166
attributes
158167
)
159168

160-
self.logger.debug('Dispatching impression event to URL %s with params %s.' % (
161-
impression_event.url,
162-
impression_event.params
163-
))
164-
165-
try:
166-
self.event_dispatcher.dispatch_event(impression_event)
167-
except:
168-
self.logger.exception('Unable to dispatch impression event!')
169+
self.event_processor.process(user_event)
169170

170-
self.notification_center.send_notifications(enums.NotificationTypes.ACTIVATE,
171-
experiment, user_id, attributes, variation, impression_event)
171+
# Kept for backward compatibility.
172+
# This notification is deprecated and new Decision notifications
173+
# are sent via their respective method calls.
174+
if len(self.notification_center.notification_listeners[enums.NotificationTypes.ACTIVATE]) > 0:
175+
log_event = event_factory.EventFactory.create_log_event(user_event, self.logger)
176+
self.notification_center.send_notifications(enums.NotificationTypes.ACTIVATE, experiment,
177+
user_id, attributes, variation, log_event.__dict__)
172178

173179
def _get_feature_variable_for_type(self,
174180
project_config,
@@ -359,24 +365,21 @@ def track(self, event_key, user_id, attributes=None, event_tags=None):
359365
self.logger.info('Not tracking user "%s" for event "%s".' % (user_id, event_key))
360366
return
361367

362-
conversion_event = self.event_builder.create_conversion_event(
368+
user_event = user_event_factory.UserEventFactory.create_conversion_event(
363369
project_config,
364370
event_key,
365371
user_id,
366372
attributes,
367373
event_tags
368374
)
375+
376+
self.event_processor.process(user_event)
369377
self.logger.info('Tracking event "%s" for user "%s".' % (event_key, user_id))
370-
self.logger.debug('Dispatching conversion event to URL %s with params %s.' % (
371-
conversion_event.url,
372-
conversion_event.params
373-
))
374-
try:
375-
self.event_dispatcher.dispatch_event(conversion_event)
376-
except:
377-
self.logger.exception('Unable to dispatch conversion event!')
378-
self.notification_center.send_notifications(enums.NotificationTypes.TRACK, event_key, user_id,
379-
attributes, event_tags, conversion_event)
378+
379+
if len(self.notification_center.notification_listeners[enums.NotificationTypes.TRACK]) > 0:
380+
log_event = event_factory.EventFactory.create_log_event(user_event, self.logger)
381+
self.notification_center.send_notifications(enums.NotificationTypes.TRACK, event_key, user_id,
382+
attributes, event_tags, log_event.__dict__)
380383

381384
def get_variation(self, experiment_key, user_id, attributes=None):
382385
""" Gets variation where user will be bucketed.

tests/helpers_tests/test_validator.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2018, Optimizely
1+
# Copyright 2016-2019, Optimizely
22
# Licensed under the Apache License, Version 2.0 (the "License");
33
# you may not use this file except in compliance with the License.
44
# You may obtain a copy of the License at
@@ -20,6 +20,7 @@
2020
from optimizely import error_handler
2121
from optimizely import event_dispatcher
2222
from optimizely import logger
23+
from optimizely.event import event_processor
2324
from optimizely.helpers import validator
2425

2526
from tests import base
@@ -42,6 +43,20 @@ def some_other_method(self):
4243

4344
self.assertFalse(validator.is_config_manager_valid(CustomConfigManager()))
4445

46+
def test_is_event_processor_valid__returns_true(self):
47+
""" Test that valid event_processor returns True. """
48+
49+
self.assertTrue(validator.is_event_processor_valid(event_processor.ForwardingEventProcessor))
50+
51+
def test_is_event_processor_valid__returns_false(self):
52+
""" Test that invalid event_processor returns False. """
53+
54+
class CustomEventProcessor(object):
55+
def some_other_method(self):
56+
pass
57+
58+
self.assertFalse(validator.is_event_processor_valid(CustomEventProcessor))
59+
4560
def test_is_datafile_valid__returns_true(self):
4661
""" Test that valid datafile returns True. """
4762

tests/test_event_processor.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
from . import base
2020
from optimizely.event.payload import Decision, Visitor
21-
from optimizely.event.event_processor import BatchEventProcessor
21+
from optimizely.event.event_processor import BatchEventProcessor, ForwardingEventProcessor
22+
from optimizely.event.event_factory import EventFactory
2223
from optimizely.event.log_event import LogEvent
2324
from optimizely.event.user_event_factory import UserEventFactory
2425
from optimizely.helpers import enums
@@ -401,3 +402,81 @@ def on_log_event(log_event):
401402
self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[
402403
enums.NotificationTypes.LOG_EVENT
403404
]))
405+
406+
407+
class TestForwardingEventDispatcher(object):
408+
409+
def __init__(self, is_updated=False):
410+
self.is_updated = is_updated
411+
412+
def dispatch_event(self, log_event):
413+
if log_event.http_verb == 'POST' and log_event.url == EventFactory.EVENT_ENDPOINT:
414+
self.is_updated = True
415+
return self.is_updated
416+
417+
418+
class ForwardingEventProcessorTest(base.BaseTest):
419+
420+
def setUp(self, *args, **kwargs):
421+
base.BaseTest.setUp(self, 'config_dict_with_multiple_experiments')
422+
self.test_user_id = 'test_user'
423+
self.event_name = 'test_event'
424+
self.optimizely.logger = SimpleLogger()
425+
self.notification_center = self.optimizely.notification_center
426+
self.event_dispatcher = TestForwardingEventDispatcher(is_updated=False)
427+
428+
with mock.patch.object(self.optimizely, 'logger') as mock_config_logging:
429+
self._event_processor = ForwardingEventProcessor(self.event_dispatcher,
430+
mock_config_logging,
431+
self.notification_center
432+
)
433+
434+
def _build_conversion_event(self, event_name):
435+
return UserEventFactory.create_conversion_event(self.project_config,
436+
event_name,
437+
self.test_user_id,
438+
{},
439+
{}
440+
)
441+
442+
def test_event_processor__dispatch_raises_exception(self):
443+
""" Test that process logs dispatch failure gracefully. """
444+
445+
user_event = self._build_conversion_event(self.event_name)
446+
log_event = EventFactory.create_log_event(user_event, self.optimizely.logger)
447+
448+
with mock.patch.object(self.optimizely, 'logger') as mock_client_logging, \
449+
mock.patch.object(self.event_dispatcher, 'dispatch_event',
450+
side_effect=Exception('Failed to send.')):
451+
452+
event_processor = ForwardingEventProcessor(self.event_dispatcher, mock_client_logging, self.notification_center)
453+
event_processor.process(user_event)
454+
455+
mock_client_logging.exception.assert_called_once_with(
456+
'Error dispatching event: ' + str(log_event) + ' Failed to send.'
457+
)
458+
459+
def test_event_processor__with_test_event_dispatcher(self):
460+
user_event = self._build_conversion_event(self.event_name)
461+
self._event_processor.process(user_event)
462+
self.assertStrictTrue(self.event_dispatcher.is_updated)
463+
464+
def test_notification_center(self):
465+
466+
callback_hit = [False]
467+
468+
def on_log_event(log_event):
469+
self.assertStrictTrue(isinstance(log_event, LogEvent))
470+
callback_hit[0] = True
471+
472+
self.optimizely.notification_center.add_notification_listener(
473+
enums.NotificationTypes.LOG_EVENT, on_log_event
474+
)
475+
476+
user_event = self._build_conversion_event(self.event_name)
477+
self._event_processor.process(user_event)
478+
479+
self.assertEqual(True, callback_hit[0])
480+
self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[
481+
enums.NotificationTypes.LOG_EVENT
482+
]))

0 commit comments

Comments
 (0)