Skip to content

Commit d1b521b

Browse files
feat: add odp manager (#405)
* bump up py version in gitactions to 3.10 * feat: add odp_manager * add update config event manager signal Co-authored-by: Andy Leap <andrew.leap@optimizely.com>
1 parent de849d2 commit d1b521b

File tree

7 files changed

+627
-22
lines changed

7 files changed

+627
-22
lines changed

optimizely/exceptions.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,21 @@ class UnsupportedDatafileVersionException(Exception):
6464
""" Raised when provided version in datafile is not supported. """
6565

6666
pass
67+
68+
69+
class OdpNotEnabled(Exception):
70+
""" Raised when Optimizely Data Platform (ODP) is not enabled. """
71+
72+
pass
73+
74+
75+
class OdpNotIntegrated(Exception):
76+
""" Raised when Optimizely Data Platform (ODP) is not integrated. """
77+
78+
pass
79+
80+
81+
class OdpInvalidData(Exception):
82+
""" Raised when passing invalid ODP data. """
83+
84+
pass

optimizely/helpers/enums.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ class Errors:
123123
INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).'
124124
FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).'
125125
ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).'
126-
ODP_NOT_ENABLED: Final = 'ODP is not enabled.'
127126
ODP_NOT_INTEGRATED: Final = 'ODP is not integrated.'
127+
ODP_NOT_ENABLED: Final = 'ODP is not enabled.'
128+
ODP_INVALID_DATA: Final = 'ODP data is not valid.'
128129

129130

130131
class ForcedDecisionLogs:
@@ -214,3 +215,15 @@ class OdpEventManagerConfig:
214215
DEFAULT_BATCH_SIZE: Final = 10
215216
DEFAULT_FLUSH_INTERVAL: Final = 1
216217
DEFAULT_RETRY_COUNT: Final = 3
218+
219+
220+
class OdpManagerConfig:
221+
"""ODP Manager configs."""
222+
KEY_FOR_USER_ID: Final = 'fs_user_id'
223+
EVENT_TYPE: Final = 'fullstack'
224+
225+
226+
class OdpSegmentsCacheConfig:
227+
"""ODP Segment Cache configs."""
228+
DEFAULT_CAPACITY: Final = 10_000
229+
DEFAULT_TIMEOUT_SECS: Final = 600

optimizely/odp/odp_event_manager.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,25 @@
1212
# limitations under the License.
1313

1414
from __future__ import annotations
15+
16+
import time
1517
from enum import Enum
18+
from queue import Empty, Queue, Full
1619
from threading import Thread
1720
from typing import Optional
18-
import time
19-
from queue import Empty, Queue, Full
2021

2122
from optimizely import logger as _logging
22-
from .odp_event import OdpEvent, OdpDataDict
23+
from optimizely.helpers.enums import OdpEventManagerConfig, Errors, OdpManagerConfig
2324
from .odp_config import OdpConfig, OdpConfigState
25+
from .odp_event import OdpEvent, OdpDataDict
2426
from .zaius_rest_api_manager import ZaiusRestApiManager
25-
from optimizely.helpers.enums import OdpEventManagerConfig, Errors
2627

2728

2829
class Signal(Enum):
2930
"""Enum for sending signals to the event queue."""
3031
SHUTDOWN = 1
3132
FLUSH = 2
33+
UPDATE_CONFIG = 3
3234

3335

3436
class OdpEventManager:
@@ -55,7 +57,11 @@ def __init__(
5557
"""
5658
self.logger = logger or _logging.NoOpLogger()
5759
self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger)
60+
5861
self.odp_config = odp_config
62+
self.api_key = odp_config.get_api_key()
63+
self.api_host = odp_config.get_api_host()
64+
5965
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
6066
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
6167
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
@@ -101,7 +107,11 @@ def _run(self) -> None:
101107
self.logger.debug('ODP event queue: received flush signal.')
102108
self._flush_batch()
103109
self.event_queue.task_done()
104-
continue
110+
111+
elif item == Signal.UPDATE_CONFIG:
112+
self.logger.debug('ODP event queue: received update config signal.')
113+
self._update_config()
114+
self.event_queue.task_done()
105115

106116
elif isinstance(item, OdpEvent):
107117
self._add_to_batch(item)
@@ -136,10 +146,7 @@ def _flush_batch(self) -> None:
136146
self.logger.debug('ODP event queue: nothing to flush.')
137147
return
138148

139-
api_key = self.odp_config.get_api_key()
140-
api_host = self.odp_config.get_api_host()
141-
142-
if not api_key or not api_host:
149+
if not self.api_key or not self.api_host:
143150
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
144151
self._current_batch.clear()
145152
return
@@ -149,7 +156,7 @@ def _flush_batch(self) -> None:
149156

150157
for i in range(1 + self.retry_count):
151158
try:
152-
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch)
159+
should_retry = self.zaius_manager.send_odp_events(self.api_key, self.api_host, self._current_batch)
153160
except Exception as error:
154161
should_retry = False
155162
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Error: {error} {self._current_batch}'))
@@ -236,3 +243,22 @@ def dispatch(self, event: OdpEvent) -> None:
236243
self.event_queue.put_nowait(event)
237244
except Full:
238245
self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full"))
246+
247+
def identify_user(self, user_id: str) -> None:
248+
self.send_event(OdpManagerConfig.EVENT_TYPE, 'identified',
249+
{OdpManagerConfig.KEY_FOR_USER_ID: user_id}, {})
250+
251+
def update_config(self) -> None:
252+
"""Adds update config signal to event_queue."""
253+
try:
254+
self.event_queue.put_nowait(Signal.UPDATE_CONFIG)
255+
except Full:
256+
self.logger.error("Error updating ODP config for the event queue")
257+
258+
def _update_config(self) -> None:
259+
"""Updates the configuration used to send events."""
260+
if len(self._current_batch) > 0:
261+
self._flush_batch()
262+
263+
self.api_host = self.odp_config.get_api_host()
264+
self.api_key = self.odp_config.get_api_key()

optimizely/odp/odp_manager.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Copyright 2022, Optimizely
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
from __future__ import annotations
15+
16+
from typing import Optional, Any
17+
18+
from optimizely import exceptions as optimizely_exception
19+
from optimizely import logger as optimizely_logger
20+
from optimizely.helpers.enums import Errors, OdpManagerConfig, OdpSegmentsCacheConfig
21+
from optimizely.helpers.validator import are_odp_data_types_valid
22+
from optimizely.odp.lru_cache import OptimizelySegmentsCache, LRUCache
23+
from optimizely.odp.odp_config import OdpConfig, OdpConfigState
24+
from optimizely.odp.odp_event_manager import OdpEventManager
25+
from optimizely.odp.odp_segment_manager import OdpSegmentManager
26+
from optimizely.odp.zaius_graphql_api_manager import ZaiusGraphQLApiManager
27+
28+
29+
class OdpManager:
30+
"""Orchestrates segment manager, event manager and odp config."""
31+
32+
def __init__(
33+
self,
34+
disable: bool,
35+
segments_cache: Optional[OptimizelySegmentsCache] = None,
36+
segment_manager: Optional[OdpSegmentManager] = None,
37+
event_manager: Optional[OdpEventManager] = None,
38+
logger: Optional[optimizely_logger.Logger] = None
39+
) -> None:
40+
41+
self.enabled = not disable
42+
self.odp_config = OdpConfig()
43+
self.logger = logger or optimizely_logger.NoOpLogger()
44+
45+
self.segment_manager = segment_manager
46+
self.event_manager = event_manager
47+
48+
if not self.enabled:
49+
self.logger.info('ODP is disabled.')
50+
return
51+
52+
if not self.segment_manager:
53+
if not segments_cache:
54+
segments_cache = LRUCache(
55+
OdpSegmentsCacheConfig.DEFAULT_CAPACITY,
56+
OdpSegmentsCacheConfig.DEFAULT_TIMEOUT_SECS
57+
)
58+
self.segment_manager = OdpSegmentManager(
59+
self.odp_config,
60+
segments_cache,
61+
ZaiusGraphQLApiManager(logger), logger
62+
)
63+
else:
64+
self.segment_manager.odp_config = self.odp_config
65+
66+
if event_manager:
67+
event_manager.odp_config = self.odp_config
68+
self.event_manager = event_manager
69+
else:
70+
self.event_manager = OdpEventManager(self.odp_config, logger)
71+
72+
self.event_manager.start()
73+
74+
def fetch_qualified_segments(self, user_id: str, options: list[str]) -> Optional[list[str]]:
75+
if not self.enabled or not self.segment_manager:
76+
self.logger.error(Errors.ODP_NOT_ENABLED)
77+
return None
78+
79+
user_key = OdpManagerConfig.KEY_FOR_USER_ID
80+
user_value = user_id
81+
82+
return self.segment_manager.fetch_qualified_segments(user_key, user_value, options)
83+
84+
def identify_user(self, user_id: str) -> None:
85+
if not self.enabled or not self.event_manager:
86+
self.logger.debug('ODP identify event is not dispatched (ODP disabled).')
87+
return
88+
if self.odp_config.odp_state() == OdpConfigState.NOT_INTEGRATED:
89+
self.logger.debug('ODP identify event is not dispatched (ODP not integrated).')
90+
return
91+
92+
self.event_manager.identify_user(user_id)
93+
94+
def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None:
95+
"""
96+
Send an event to the ODP server.
97+
98+
Args:
99+
type: The event type.
100+
action: The event action name.
101+
identifiers: A dictionary for identifiers.
102+
data: A dictionary for associated data. The default event data will be added to this data
103+
before sending to the ODP server.
104+
105+
Raises custom exception if error is detected.
106+
"""
107+
if not self.enabled or not self.event_manager:
108+
raise optimizely_exception.OdpNotEnabled(Errors.ODP_NOT_ENABLED)
109+
110+
if self.odp_config.odp_state() == OdpConfigState.NOT_INTEGRATED:
111+
raise optimizely_exception.OdpNotIntegrated(Errors.ODP_NOT_INTEGRATED)
112+
113+
if not are_odp_data_types_valid(data):
114+
raise optimizely_exception.OdpInvalidData(Errors.ODP_INVALID_DATA)
115+
116+
self.event_manager.send_event(type, action, identifiers, data)
117+
118+
def update_odp_config(self, api_key: Optional[str], api_host: Optional[str],
119+
segments_to_check: list[str]) -> None:
120+
if not self.enabled:
121+
return
122+
123+
config_changed = self.odp_config.update(api_key, api_host, segments_to_check)
124+
if not config_changed:
125+
self.logger.debug('Odp config was not changed.')
126+
return
127+
128+
# reset segments cache when odp integration or segments to check are changed
129+
if self.segment_manager:
130+
self.segment_manager.reset()
131+
132+
if self.event_manager:
133+
self.event_manager.update_config()

optimizely/odp/odp_segment_manager.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,21 @@
2626
class OdpSegmentManager:
2727
"""Schedules connections to ODP for audience segmentation and caches the results."""
2828

29-
def __init__(self, odp_config: OdpConfig, segments_cache: OptimizelySegmentsCache,
30-
zaius_manager: ZaiusGraphQLApiManager,
31-
logger: Optional[optimizely_logger.Logger] = None) -> None:
29+
def __init__(
30+
self,
31+
odp_config: OdpConfig,
32+
segments_cache: OptimizelySegmentsCache,
33+
zaius_manager: ZaiusGraphQLApiManager,
34+
logger: Optional[optimizely_logger.Logger] = None
35+
) -> None:
3236

3337
self.odp_config = odp_config
3438
self.segments_cache = segments_cache
3539
self.zaius_manager = zaius_manager
3640
self.logger = logger or optimizely_logger.NoOpLogger()
3741

38-
def fetch_qualified_segments(self, user_key: str, user_value: str, options: list[str]) -> \
39-
Optional[list[str]]:
42+
def fetch_qualified_segments(self, user_key: str, user_value: str, options: list[str]
43+
) -> Optional[list[str]]:
4044
"""
4145
Args:
4246
user_key: The key for identifying the id type.
@@ -64,7 +68,7 @@ def fetch_qualified_segments(self, user_key: str, user_value: str, options: list
6468
reset_cache = OptimizelyOdpOption.RESET_CACHE in options
6569

6670
if reset_cache:
67-
self._reset()
71+
self.reset()
6872

6973
if not ignore_cache and not reset_cache:
7074
segments = self.segments_cache.lookup(cache_key)
@@ -83,7 +87,7 @@ def fetch_qualified_segments(self, user_key: str, user_value: str, options: list
8387

8488
return segments
8589

86-
def _reset(self) -> None:
90+
def reset(self) -> None:
8791
self.segments_cache.reset()
8892

8993
def make_cache_key(self, user_key: str, user_value: str) -> str:

tests/test_odp_event_manager.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ def test_odp_event_manager_events_before_odp_ready(self, *args):
411411
event_manager.send_event(**self.events[1])
412412

413413
odp_config.update(self.api_key, self.api_host, [])
414+
event_manager.update_config()
414415
event_manager.event_queue.join()
415416

416417
event_manager.send_event(**self.events[0])
@@ -423,6 +424,7 @@ def test_odp_event_manager_events_before_odp_ready(self, *args):
423424
mock_logger.debug.assert_has_calls([
424425
mock.call('ODP event queue: cannot send before the datafile has loaded.'),
425426
mock.call('ODP event queue: cannot send before the datafile has loaded.'),
427+
mock.call('ODP event queue: received update config signal.'),
426428
mock.call('ODP event queue: adding event.'),
427429
mock.call('ODP event queue: adding event.'),
428430
mock.call('ODP event queue: received flush signal.'),
@@ -442,6 +444,7 @@ def test_odp_event_manager_events_before_odp_disabled(self, *args):
442444
event_manager.send_event(**self.events[1])
443445

444446
odp_config.update(None, None, [])
447+
event_manager.update_config()
445448
event_manager.event_queue.join()
446449

447450
event_manager.send_event(**self.events[0])
@@ -453,6 +456,7 @@ def test_odp_event_manager_events_before_odp_disabled(self, *args):
453456
mock_logger.debug.assert_has_calls([
454457
mock.call('ODP event queue: cannot send before the datafile has loaded.'),
455458
mock.call('ODP event queue: cannot send before the datafile has loaded.'),
459+
mock.call('ODP event queue: received update config signal.'),
456460
mock.call(Errors.ODP_NOT_INTEGRATED),
457461
mock.call(Errors.ODP_NOT_INTEGRATED)
458462
])
@@ -496,20 +500,25 @@ def test_odp_event_manager_disabled_after_events_in_queue(self, *args):
496500
odp_config = OdpConfig(self.api_key, self.api_host)
497501

498502
event_manager = OdpEventManager(odp_config, mock_logger)
499-
event_manager.batch_size = 2
503+
event_manager.batch_size = 3
500504

501505
with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True):
502506
event_manager.send_event(**self.events[0])
503507
event_manager.send_event(**self.events[1])
504-
505-
with mock.patch.object(event_manager.zaius_manager, 'send_odp_events') as mock_send:
506508
odp_config.update(None, None, [])
509+
event_manager.update_config()
510+
511+
with mock.patch.object(
512+
event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False
513+
) as mock_send:
507514
event_manager.start()
515+
event_manager.send_event(**self.events[0])
508516
event_manager.send_event(**self.events[1])
517+
event_manager.send_event(**self.events[0])
509518
event_manager.event_queue.join()
510519

511520
self.assertEqual(len(event_manager._current_batch), 0)
512521
mock_logger.debug.assert_any_call(Errors.ODP_NOT_INTEGRATED)
513522
mock_logger.error.assert_not_called()
514-
mock_send.assert_not_called()
523+
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
515524
event_manager.stop()

0 commit comments

Comments
 (0)