Skip to content

fix(Polling Manager): Fixing how config manager blocks. Miscellaneous other fixes as well. #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions optimizely/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def __init__(self,
notification_center=notification_center)
self._config = None
self.validate_schema = not skip_json_validation
self._config_ready_event = threading.Event()
self._set_config(datafile)

def _set_config(self, datafile):
Expand Down Expand Up @@ -135,7 +134,6 @@ def _set_config(self, datafile):
return

self._config = config
self._config_ready_event.set()
self.notification_center.send_notifications(enums.NotificationTypes.OPTIMIZELY_CONFIG_UPDATE)
self.logger.debug(
'Received new datafile and updated config. '
Expand Down Expand Up @@ -186,6 +184,7 @@ def __init__(self,
JSON schema validation will be performed.

"""
self._config_ready_event = threading.Event()
super(PollingConfigManager, self).__init__(datafile=datafile,
logger=logger,
error_handler=error_handler,
Expand Down Expand Up @@ -232,6 +231,16 @@ def get_datafile_url(sdk_key, url, url_template):

return url

def _set_config(self, datafile):
""" Looks up and sets datafile and config based on response body.

Args:
datafile: JSON string representing the Optimizely project.
"""
if datafile or self._config_ready_event.is_set():
super(PollingConfigManager, self)._set_config(datafile=datafile)
self._config_ready_event.set()

def get_config(self):
""" Returns instance of ProjectConfig. Returns immediately if project config is ready otherwise
blocks maximum for value of blocking_timeout in seconds.
Expand Down
75 changes: 46 additions & 29 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BaseEventProcessor(ABC):
""" Class encapsulating event processing. Override with your own implementation. """

@abc.abstractmethod
def process(user_event):
def process(self, user_event):
""" Method to provide intermediary processing stage within event production.
Args:
user_event: UserEvent instance that needs to be processed and dispatched.
Expand All @@ -45,33 +45,34 @@ def process(user_event):
class BatchEventProcessor(BaseEventProcessor):
"""
BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.

The BatchEventProcessor maintains a single consumer thread that pulls events off of
the blocking queue and buffers them for either a configured batch size or for a
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
"""

_DEFAULT_QUEUE_CAPACITY = 1000
_DEFAULT_BATCH_SIZE = 10
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30)
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5)
_DEFAULT_FLUSH_INTERVAL = 30
_DEFAULT_TIMEOUT_INTERVAL = 5
_SHUTDOWN_SIGNAL = object()
_FLUSH_SIGNAL = object()
LOCK = threading.Lock()

def __init__(self,
event_dispatcher,
logger,
logger=None,
start_on_init=False,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None,
notification_center=None):
""" EventProcessor init method to configure event batching.
""" BatchEventProcessor init method to configure event batching.

Args:
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
logger: Provides a log method to log messages. By default nothing would be logged.
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
start_on_init: Optional boolean param which starts the consumer thread if set to True.
Default value is False.
event_queue: Optional component which accumulates the events until dispacthed.
Expand All @@ -86,20 +87,28 @@ def __init__(self,
self.event_dispatcher = event_dispatcher or default_event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY)
self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \
self.batch_size = batch_size if self._validate_instantiation_props(batch_size,
'batch_size',
self._DEFAULT_BATCH_SIZE) \
else self._DEFAULT_BATCH_SIZE
self.flush_interval = timedelta(seconds=flush_interval) \
if self._validate_intantiation_props(flush_interval, 'flush_interval') \
else self._DEFAULT_FLUSH_INTERVAL
if self._validate_instantiation_props(flush_interval,
'flush_interval',
self._DEFAULT_FLUSH_INTERVAL) \
else timedelta(self._DEFAULT_FLUSH_INTERVAL)
self.timeout_interval = timedelta(seconds=timeout_interval) \
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
else self._DEFAULT_TIMEOUT_INTERVAL
self.notification_center = notification_center
if self._validate_instantiation_props(timeout_interval,
'timeout_interval',
self._DEFAULT_TIMEOUT_INTERVAL) \
else timedelta(self._DEFAULT_TIMEOUT_INTERVAL)

self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helps us get rid of conditionals later on.

self._current_batch = list()

if not validator.is_notification_center_valid(self.notification_center):
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
self.notification_center = _notification_center.NotificationCenter()
self.logger.debug('Creating notification center for use.')
self.notification_center = _notification_center.NotificationCenter(self.logger)

self.executor = None
if start_on_init is True:
Expand All @@ -110,13 +119,14 @@ def is_running(self):
""" Property to check if consumer thread is alive or not. """
return self.executor.isAlive() if self.executor else False

def _validate_intantiation_props(self, prop, prop_name):
def _validate_instantiation_props(self, prop, prop_name, default_value):
""" Method to determine if instantiation properties like batch_size, flush_interval
and timeout_interval are valid.

Args:
prop: Property value that needs to be validated.
prop_name: Property name.
default_value: Default value for property.

Returns:
False if property value is None or less than or equal to 0 or not a finite number.
Expand All @@ -132,7 +142,7 @@ def _validate_intantiation_props(self, prop, prop_name):
is_valid = False

if is_valid is False:
self.logger.info('Using default value for {}.'.format(prop_name))
self.logger.info('Using default value {} for {}.'.format(default_value, prop_name))

return is_valid

Expand Down Expand Up @@ -213,11 +223,10 @@ def _flush_queue(self):

log_event = EventFactory.create_log_event(to_process_batch, self.logger)

if self.notification_center is not None:
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)

try:
self.event_dispatcher.dispatch_event(log_event)
Expand All @@ -226,14 +235,17 @@ def _flush_queue(self):

def process(self, user_event):
""" Method to process the user_event by putting it in event_queue.

Args:
user_event: UserEvent Instance.
"""
if not isinstance(user_event, UserEvent):
self.logger.error('Provided event is in an invalid format.')
return

self.logger.debug('Received user_event: ' + str(user_event))
self.logger.debug('Received event of type {} for user {}.'.format(
type(user_event).__name__, user_event.user_id)
)

try:
self.event_queue.put_nowait(user_event)
Expand All @@ -242,6 +254,7 @@ def process(self, user_event):

def _add_to_batch(self, user_event):
""" Method to append received user event to current batch.

Args:
user_event: UserEvent Instance.
"""
Expand All @@ -261,9 +274,11 @@ def _add_to_batch(self, user_event):

def _should_split(self, user_event):
""" Method to check if current event batch should split into two.

Args:
user_event: UserEvent Instance.
Return Value:

Returns:
- True, if revision number and project_id of last event in current batch do not match received event's
revision number and project id respectively.
- False, otherwise.
Expand Down Expand Up @@ -311,30 +326,32 @@ def __init__(self, event_dispatcher, logger=None, notification_center=None):
"""
self.event_dispatcher = event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
self.notification_center = notification_center
self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger)

if not validator.is_notification_center_valid(self.notification_center):
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
self.notification_center = _notification_center.NotificationCenter()

def process(self, user_event):
""" Method to process the user_event by dispatching it.

Args:
user_event: UserEvent Instance.
"""
if not isinstance(user_event, UserEvent):
self.logger.error('Provided event is in an invalid format.')
return

self.logger.debug('Received user_event: ' + str(user_event))
self.logger.debug('Received event of type {} for user {}.'.format(
type(user_event).__name__, user_event.user_id)
)

log_event = EventFactory.create_log_event(user_event, self.logger)

if self.notification_center is not None:
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)

try:
self.event_dispatcher.dispatch_event(log_event)
Expand Down
9 changes: 6 additions & 3 deletions optimizely/optimizely.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ def __init__(self,
notification_center: Optional instance of notification_center.NotificationCenter. Useful when providing own
config_manager.BaseConfigManager implementation which can be using the
same NotificationCenter instance.
event_processor: Processes the given event(s) by creating LogEvent(s) and then dispatching it.
event_processor: Optional component which processes the given event(s).
By default optimizely.event.event_processor.ForwardingEventProcessor is used
which simply forwards events to the event dispatcher.
To enable event batching configure and use optimizely.event.event_processor.BatchEventProcessor.
"""
self.logger_name = '.'.join([__name__, self.__class__.__name__])
self.is_valid = True
Expand All @@ -68,8 +71,8 @@ def __init__(self,
self.config_manager = config_manager
self.notification_center = notification_center or NotificationCenter(self.logger)
self.event_processor = event_processor or ForwardingEventProcessor(self.event_dispatcher,
self.logger,
self.notification_center)
logger=self.logger,
notification_center=self.notification_center)

try:
self._validate_instantiation_options()
Expand Down
25 changes: 16 additions & 9 deletions tests/test_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ def test_get_config(self):
# Assert that config is set.
self.assertIsInstance(project_config_manager.get_config(), project_config.ProjectConfig)

def test_get_config_blocks(self):
""" Test that get_config blocks until blocking timeout is hit. """
start_time = time.time()
project_config_manager = config_manager.PollingConfigManager(sdk_key='sdk_key',
blocking_timeout=5)
# Assert get_config should block until blocking timeout.
project_config_manager.get_config()
end_time = time.time()
self.assertEqual(5, round(end_time - start_time))


@mock.patch('requests.get')
class PollingConfigManagerTest(base.BaseTest):
Expand Down Expand Up @@ -217,7 +227,8 @@ def test_get_datafile_url__sdk_key_and_url_and_template_provided(self, _):

def test_set_update_interval(self, _):
""" Test set_update_interval with different inputs. """
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')

# Assert that if invalid update_interval is set, then exception is raised.
with self.assertRaisesRegexp(optimizely_exceptions.InvalidInputException,
Expand All @@ -238,7 +249,8 @@ def test_set_update_interval(self, _):

def test_set_blocking_timeout(self, _):
""" Test set_blocking_timeout with different inputs. """
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')

# Assert that if invalid blocking_timeout is set, then exception is raised.
with self.assertRaisesRegexp(optimizely_exceptions.InvalidInputException,
Expand All @@ -261,15 +273,10 @@ def test_set_blocking_timeout(self, _):
project_config_manager.set_blocking_timeout(5)
self.assertEqual(5, project_config_manager.blocking_timeout)

# Assert get_config should block until blocking timeout.
start_time = time.time()
project_config_manager.get_config()
end_time = time.time()
self.assertEqual(5, round(end_time - start_time))

def test_set_last_modified(self, _):
""" Test that set_last_modified sets last_modified field based on header. """
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')

last_modified_time = 'Test Last Modified Time'
test_response_headers = {
Expand Down
Loading