Skip to content

Commit 1fe3161

Browse files
committed
feat: flag tracking event listener mode
1 parent ce6be11 commit 1fe3161

File tree

11 files changed

+470
-9
lines changed

11 files changed

+470
-9
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,40 @@ if client.initialize:
149149

150150
> Note that if evaluation called before Go SDK client initialized, you set the wrong flag key/user for the evaluation or the related feature flag is not found, SDK will return the default value you set. The `fbclient.common_types.EvalDetail` will explain the details of the latest evaluation including error raison.
151151
152+
### Flag Tracking
153+
154+
`fbclient.client.FBClient.flag_tracker` registers a listener to be notified of feature flag changes in general.
155+
156+
Note that a flag value change listener is bound to a specific user and flag key.
157+
158+
The flag value change listener will be notified whenever the SDK receives any change to any feature flag's configuration,
159+
or to a user segment that is referenced by a feature flag. To register a flag value change listener, use `add_flag_value_may_changed_listener` or `add_flag_value_changed_listener`
160+
161+
When you track a flag change using `add_flag_value_may_changed_listener`, this does not necessarily mean the flag's value has changed for any particular flag, only that some part of the flag configuration was changed so that it *_MAY_* return a different value than it previously returned for some user.
162+
163+
If you want to track a flag whose value *_MUST_* be changed, `add_flag_value_changed_listener` will register a listener that will be notified if and only if the flag value changes.
164+
165+
Change notices only work if the SDK is actually connecting to FeatBit feature flag center.
166+
If the SDK is in offline mode, then it cannot know when there is a change, because flags are read on an as-needed basis.
167+
168+
```python
169+
if client.initialize:
170+
# flag value may be changed
171+
client.flag_tracker.add_flag_value_may_changed_listener(flag_key, user, callback_fn)
172+
# flag value must be changed
173+
client.flag_tracker.add_flag_value_changed_listener(flag_key, user, callback_fn)
174+
175+
```
176+
`flag_key`: the key of the feature flag to track
177+
178+
`user`: the user to evaluate the flag value
179+
180+
`callback_fn`: the function to be called for the flag value change
181+
* the first argument is the flag key
182+
* the second argument is the latest flag value
183+
184+
185+
152186
### Offline Mode
153187

154188
In some situations, you might want to stop making remote calls to FeatBit. Here is how:

fbclient/client.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
REASON_USER_NOT_SPECIFIED, Evaluator)
1313
from fbclient.event_processor import DefaultEventProcessor, NullEventProcessor
1414
from fbclient.event_types import FlagEvent, Metric, MetricEvent, UserEvent
15+
from fbclient.flag_change_notification import FlagTracker
1516
from fbclient.interfaces import DataUpdateStatusProvider
17+
from fbclient.notice_broadcaster import NoticeBroadcater
1618
from fbclient.status import DataUpdateStatusProviderImpl
1719
from fbclient.streaming import Streaming, _data_to_dict
1820
from fbclient.update_processor import NullUpdateProcessor
@@ -71,6 +73,9 @@ def __init__(self, config: Config, start_wait: float = 15.):
7173
else:
7274
self._config.validate()
7375

76+
self._broadcaster = NoticeBroadcater()
77+
self._flag_tracker = FlagTracker(self._broadcaster, self.variation)
78+
7479
# init components
7580
# event processor
7681
self._event_processor = self._build_event_processor(config)
@@ -84,8 +89,7 @@ def __init__(self, config: Config, start_wait: float = 15.):
8489
self._update_status_provider = DataUpdateStatusProviderImpl(config.data_storage)
8590
# update processor
8691
update_processor_ready = threading.Event()
87-
self._update_processor = self._build_update_processor(config, self._update_status_provider,
88-
update_processor_ready)
92+
self._update_processor = self._build_update_processor(config, self._broadcaster, self._update_status_provider, update_processor_ready)
8993
self._update_processor.start()
9094

9195
if start_wait > 0:
@@ -111,7 +115,7 @@ def _build_event_processor(self, config: Config):
111115

112116
return DefaultEventProcessor(config, DefaultSender('insight', config, max_size=10))
113117

114-
def _build_update_processor(self, config: Config, update_status_provider, update_processor_event):
118+
def _build_update_processor(self, config: Config, broadcaster: NoticeBroadcater, update_status_provider, update_processor_event):
115119
if config.update_processor_imp:
116120
log.debug("Using user-specified update processor: %s" % str(config.update_processor_imp))
117121
return config.update_processor_imp(config, update_status_provider, update_processor_event)
@@ -120,7 +124,7 @@ def _build_update_processor(self, config: Config, update_status_provider, update
120124
log.debug("Offline mode, SDK disable streaming data updating")
121125
return NullUpdateProcessor(config, update_status_provider, update_processor_event)
122126

123-
return Streaming(config, update_status_provider, update_processor_event)
127+
return Streaming(config, broadcaster, update_status_provider, update_processor_event)
124128

125129
@property
126130
def initialize(self) -> bool:
@@ -136,6 +140,15 @@ def initialize(self) -> bool:
136140
def update_status_provider(self) -> DataUpdateStatusProvider:
137141
return self._update_status_provider
138142

143+
@property
144+
def flag_tracker(self) -> FlagTracker:
145+
"""
146+
Returns an object for tracking changes in feature flag configurations.
147+
The :class:`FlagTracker` contains methods for requesting notifications about feature flag changes using
148+
an event listener model.
149+
"""
150+
return self._flag_tracker
151+
139152
def stop(self):
140153
"""Releases all threads and network connections used by SDK.
141154
@@ -145,6 +158,7 @@ def stop(self):
145158
self._data_storage.stop()
146159
self._update_processor.stop()
147160
self._event_processor.stop()
161+
self._broadcaster.stop()
148162

149163
def __enter__(self):
150164
return self

fbclient/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def _match_default_user_variation(self, flag: dict, user: FBUser) -> Optional[_E
146146

147147
def _match_any_rule(self, user: FBUser, rule: dict) -> bool:
148148
# conditions cannot be empty
149-
return all(self._process_condition(user, condiction) for condiction in rule['conditions'])
149+
return all(self._process_condition(user, condition) for condition in rule['conditions'])
150150

151151
def _process_condition(self, user: FBUser, condition: dict) -> bool:
152152
op = condition['op']
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
2+
from abc import ABC, abstractmethod
3+
from typing import Any, Callable
4+
from fbclient.common_types import FBUser
5+
from fbclient.interfaces import Notice
6+
from fbclient.notice_broadcaster import NoticeBroadcater
7+
8+
FLAG_CHANGE_NOTICE_TYPE = 'flag_change_notice'
9+
10+
11+
class FlagChangedNotice(Notice):
12+
def __init__(self, flag_key: str):
13+
self.__flag_key = flag_key
14+
15+
@property
16+
def notice_type(self) -> str:
17+
return FLAG_CHANGE_NOTICE_TYPE
18+
19+
@property
20+
def flag_key(self) -> str:
21+
return self.__flag_key
22+
23+
24+
class FlagChangedListener(ABC):
25+
"""
26+
A notice listener that is notified when a feature flag's configuration has changed.
27+
28+
This is an abstract class. You need to implement your own listener by overriding the :func:`on_flag_change` method.
29+
30+
"""
31+
@abstractmethod
32+
def on_flag_change(self, notice: FlagChangedNotice):
33+
pass
34+
35+
36+
class FlagValueChangedListener(FlagChangedListener):
37+
def __init__(self,
38+
flag_key: str,
39+
user: dict,
40+
evaluate_fn: Callable[[str, dict, Any], Any],
41+
flag_value_changed_fn: Callable[[str, Any], None]):
42+
self.__flag_key = flag_key
43+
self.__user = user
44+
self.__evaluate_fn = evaluate_fn
45+
self.__fn = flag_value_changed_fn
46+
# record the flag value when the listener is created
47+
self.__prvious_flag_value = self.__evaluate_fn(self.__flag_key, self.__user, None)
48+
49+
def on_flag_change(self, notice: FlagChangedNotice):
50+
if notice.flag_key == self.__flag_key:
51+
prev_flag_value = self.__prvious_flag_value
52+
curr_flag_value = self.__evaluate_fn(self.__flag_key, self.__user, None)
53+
if prev_flag_value != curr_flag_value:
54+
self.__fn(self.__flag_key, curr_flag_value)
55+
self.__prvious_flag_value = curr_flag_value
56+
57+
58+
class FlagValueMayChangedListener(FlagChangedListener):
59+
def __init__(self,
60+
flag_key: str,
61+
user: dict,
62+
evaluate_fn: Callable[[str, dict, Any], Any],
63+
flag_value_changed_fn: Callable[[str, Any], None]):
64+
self.__flag_key = flag_key
65+
self.__user = user
66+
self.__evaluate_fn = evaluate_fn
67+
self.__fn = flag_value_changed_fn
68+
69+
def on_flag_change(self, notice: FlagChangedNotice):
70+
if notice.flag_key == self.__flag_key:
71+
curr_flag_value = self.__evaluate_fn(self.__flag_key, self.__user, None)
72+
self.__fn(self.__flag_key, curr_flag_value)
73+
74+
75+
class FlagTracker:
76+
"""
77+
A registry to register the flag change listeners in order to track changes in feature flag configurations.
78+
79+
The registered listerners only work if the SDK is actually connecting to FeatBit feature flag center.
80+
If the SDK is only in offline mode then it cannot know when there is a change, because flags are read on an as-needed basis.
81+
82+
Application code never needs to initialize or extend this class directly.
83+
"""
84+
85+
def __init__(self,
86+
flag_change_broadcaster: NoticeBroadcater,
87+
evaluate_fn: Callable[[str, dict, Any], Any]):
88+
"""
89+
:param flag_change_broadcaster: The broadcaster that broadcasts the flag change notices
90+
:param evaluate_fn: The function to evaluate the flag value
91+
"""
92+
self.__broadcater = flag_change_broadcaster
93+
self.__evaluate_fn = evaluate_fn
94+
95+
def add_flag_value_changed_listener(self,
96+
flag_key: str,
97+
user: dict,
98+
flag_value_changed_fn: Callable[[str, Any], None]) -> FlagValueChangedListener:
99+
"""
100+
Registers a listener to be notified of a change in a specific feature flag's value for a specific FeatBit user.
101+
102+
The listener will be notified whenever the SDK receives any change to any feature flag's configuration,
103+
or to a user segment that is referenced by a feature flag.
104+
105+
When you call this method, it first immediately evaluates the feature flag. It then uses :class:`FlagChangeListener` to start listening for feature flag configuration
106+
changes, and whenever the specified feature flag changes, it re-evaluates the flag for the same user. It then calls :class:`FlagValueChangeListener`
107+
if and only if the resulting value has changed.
108+
109+
:param flag_key: The key of the feature flag to track
110+
:param user: The user to evaluate the flag value
111+
:param flag_value_changed_fn: The function to be called only when this flag value changes
112+
* the first argument is the flag key
113+
* the second argument is the latest flag value, this value must be different from the previous value
114+
115+
:return: A listener object that can be used to remove it later on.
116+
"""
117+
118+
# check flag key
119+
if not isinstance(flag_key, str) or not flag_key:
120+
raise ValueError('flag_key must be a non-empty string')
121+
# check user
122+
FBUser.from_dict(user)
123+
# check flag_value_changed_fn
124+
if not isinstance(flag_value_changed_fn, Callable) or not flag_value_changed_fn:
125+
raise ValueError('flag_value_changed_fn must be a callable function')
126+
127+
listener = FlagValueChangedListener(flag_key, user, self.__evaluate_fn, flag_value_changed_fn)
128+
self.add_flag_changed_listener(listener)
129+
return listener
130+
131+
def add_flag_value_may_changed_listener(self,
132+
flag_key: str,
133+
user: dict,
134+
flag_value_changed_fn: Callable[[str, Any], None]) -> FlagValueMayChangedListener:
135+
"""
136+
Registers a listener to be notified of a change in a specific feature flag's value for a specific FeatBit user.
137+
138+
The listener will be notified whenever the SDK receives any change to any feature flag's configuration,
139+
or to a user segment that is referenced by a feature flag.
140+
141+
Note that this does not necessarily mean the flag's value has changed for any particular flag,
142+
only that some part of the flag configuration was changed so that it may return a different value than it previously returned for some user.
143+
144+
If you want to track flag value changes,use :func:`add_flag_value_changed_listener instead.
145+
146+
:param flag_key: The key of the feature flag to track
147+
:param user: The user to evaluate the flag value
148+
:param flag_value_changed_fn: The function to be called only if any changes to a specific flag
149+
* the first argument is the flag key
150+
* the second argument is the latest flag value, this value may be same as the previous value
151+
152+
:return: A listener object that can be used to remove it later on.
153+
154+
"""
155+
156+
# check flag key
157+
if not isinstance(flag_key, str) or not flag_key:
158+
raise ValueError('flag_key must be a non-empty string')
159+
# check user
160+
FBUser.from_dict(user)
161+
# check flag_value_changed_fn
162+
if not isinstance(flag_value_changed_fn, Callable) or not flag_value_changed_fn:
163+
raise ValueError('flag_value_changed_fn must be a callable function')
164+
165+
listener = FlagValueMayChangedListener(flag_key, user, self.__evaluate_fn, flag_value_changed_fn)
166+
self.add_flag_changed_listener(listener)
167+
return listener
168+
169+
def add_flag_changed_listener(self, listener: FlagChangedListener):
170+
"""
171+
Registers a listener to be notified of feature flag changes in general.
172+
173+
The listener will be notified whenever the SDK receives any change to any feature flag's configuration,
174+
or to a user segment that is referenced by a feature flag.
175+
176+
:param listener: The listener to be registered. The :class:`FlagChangedListner` is an abstract class. You need to implement your own listener.
177+
"""
178+
self.__broadcater.add_listener(FLAG_CHANGE_NOTICE_TYPE, listener.on_flag_change) # type: ignore
179+
180+
def remove_flag_change_notifier(self, listener: FlagChangedListener):
181+
"""
182+
Unregisters a listener so that it will no longer be notified of feature flag changes.
183+
184+
:param listener: The listener to be unregistered. The listener must be the same object that was passed to :func:`add_flag_changed_listner` or :func:`add_flag_value_changed_listerner`
185+
"""
186+
self.__broadcater.remove_listener(FLAG_CHANGE_NOTICE_TYPE, listener.on_flag_change) # type: ignore

fbclient/interfaces.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,17 @@ def stop(self):
265265
Shuts down the connection to feature flag center
266266
"""
267267
pass
268+
269+
270+
class Notice(ABC):
271+
"""
272+
This is not an insight event to be sent to FeatBit Flag Center; it is a notice to notify the SDK that something has happened,
273+
such as flag values updated
274+
"""
275+
@property
276+
@abstractmethod
277+
def notice_type(self) -> str:
278+
"""
279+
Returns the type of this notice
280+
"""
281+
pass

fbclient/notice_broadcaster.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
2+
from queue import Empty, Queue
3+
from threading import Thread
4+
from typing import Callable
5+
from fbclient.interfaces import Notice
6+
7+
from fbclient.utils import log
8+
9+
10+
class NoticeBroadcater:
11+
def __init__(self):
12+
self.__notice_queue = Queue()
13+
self.__closed = False
14+
self.__listeners = {}
15+
self.__thread = Thread(daemon=True, target=self.__run)
16+
log.debug('notice broadcaster starting...')
17+
self.__thread.start()
18+
19+
def add_listener(self, notice_type: str, listener: Callable[[Notice], None]):
20+
if isinstance(notice_type, str) and notice_type.strip() and listener is not None:
21+
log.debug('add a listener for notice type %s' % notice_type)
22+
if notice_type not in self.__listeners:
23+
self.__listeners[notice_type] = []
24+
self.__listeners[notice_type].append(listener)
25+
26+
def remove_listener(self, notice_type: str, listener: Callable[[Notice], None]):
27+
if notice_type in self.__listeners and listener is not None:
28+
log.debug('remove a listener for notice type %s' % notice_type)
29+
notifiers = self.__listeners[notice_type]
30+
if not notifiers:
31+
del self.__listeners[notice_type]
32+
else:
33+
notifiers.remove(listener)
34+
35+
def broadcast(self, notice: Notice):
36+
self.__notice_queue.put(notice)
37+
38+
def stop(self):
39+
log.debug('notice broadcaster stopping...')
40+
self.__closed = True
41+
self.__thread.join()
42+
43+
def __run(self):
44+
while not self.__closed:
45+
try:
46+
notice = self.__notice_queue.get(block=True, timeout=1)
47+
self.__notice_process(notice)
48+
except Empty:
49+
pass
50+
51+
def __notice_process(self, notice: Notice):
52+
if notice.notice_type in self.__listeners:
53+
for listerner in self.__listeners[notice.notice_type]:
54+
try:
55+
listerner(notice)
56+
except Exception as e:
57+
log.exception('FB Python SDK: unexpected error in handle notice %s: %s' % (notice.notice_type, str(e)))

fbclient/status.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def __handle_exception(self, error: Exception, error_type: str, message: str):
3636
log.exception('FB Python SDK: Data Storage error: %s, UpdateProcessor will attempt to receive the data' % str(error))
3737
self.update_state(State.interrupted_state(error_type, message))
3838

39+
def get_all(self, kind: Category) -> Mapping[str, dict]:
40+
return self.__storage.get_all(kind)
41+
3942
@property
4043
def initialized(self) -> bool:
4144
return self.__storage.initialized

0 commit comments

Comments
 (0)