Skip to content

feat: support realtime toggle update via socket.io #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

config = fp.Config(remote_uri=FEATURE_PROBE_SERVER_URL, # FeatureProbe server URL
sync_mode='pooling',
refresh_interval=2,
refresh_interval=30,
start_wait=5)

# Server Side SDK Key for your project and environment
Expand Down
39 changes: 1 addition & 38 deletions featureprobe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,19 @@

# --------------- API --------------- #

from featureprobe.model import *

from featureprobe.access_recorder import (
AccessCounter,
AccessRecorder,
)
import featureprobe.model

from featureprobe.config import Config
from featureprobe.context import Context
from featureprobe.data_repository import DataRepository
from featureprobe.detail import Detail
from featureprobe.evaluation_result import EvaluationResult
from featureprobe.event import AccessEvent
from featureprobe.hit_result import HitResult
from featureprobe.http_config import HttpConfig
from featureprobe.client import Client
from featureprobe.user import User


__all__ = [
# featureprobe.model

'Condition',
'ConditionType',
'StringPredicate',
'SegmentPredicate',
'DatetimePredicate',
'NumberPredicate',
'SemverPredicate',
'Distribution',
'Repository',
'Rule',
'Segment',
'SegmentRule',
'Serve',
'Split',
'Toggle',

# featureprobe

'AccessCounter',
'AccessRecorder',
'Client',
'Config',
'Context',
'DataRepository',
'Detail',
'EvaluationResult',
'AccessEvent',
'HitResult',
'HttpConfig',
'User',
]
2 changes: 1 addition & 1 deletion featureprobe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, server_sdk_key: str, config: Config = Config()):
synchronize_process_ready = Event()
self._synchronizer = config.synchronizer_creator(
context, self._data_repo, synchronize_process_ready)
self._synchronizer.sync()
self._synchronizer.start()
if config.start_wait > 0:
Client.__logger.info("Waiting up to " +
str(config.start_wait) +
Expand Down
11 changes: 8 additions & 3 deletions featureprobe/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def __new__(cls, value, synchronizer_creator):

@defaultable
class Config:

def __init__(self,
location: str = None,
sync_mode: Union[str, SyncMode] = SyncMode.POOLING,
remote_uri: str = "http://127.0.0.1:4007",
synchronizer_url: str = None,
event_url: str = None,
remote_uri: str = 'http://127.0.0.1:4007',
realtime_url: str = None,
http_config: HttpConfig = HttpConfig(),
refresh_interval: Union[timedelta, float] = timedelta(seconds=2),
start_wait: float = 5,
Expand All @@ -57,9 +57,10 @@ def __init__(self,
self._synchronizer_creator = SyncMode(sync_mode).synchronizer_creator
self._data_repository_creator = MemoryDataRepository.from_context
self._event_processor_creator = DefaultEventProcessor.from_context
self._remote_uri = remote_uri
self._synchronizer_url = synchronizer_url
self._event_url = event_url
self._remote_uri = remote_uri
self._realtime_url = realtime_url
self._start_wait = start_wait
self._http_config = http_config or HttpConfig()
self._refresh_interval = refresh_interval \
Expand Down Expand Up @@ -94,6 +95,10 @@ def event_url(self):
def remote_uri(self):
return self._remote_uri

@property
def realtime_url(self):
return self._realtime_url

@property
def http_config(self):
return self._http_config
Expand Down
10 changes: 9 additions & 1 deletion featureprobe/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
class Context:
_GET_REPOSITORY_DATA_API = '/api/server-sdk/toggles'
_POST_EVENTS_DATA_API = '/api/events'
_REALTIME_EVENT_API = '/realtime'

def __init__(self, sdk_key: str, config: "Config"):
from featureprobe import __version__
from featureprobe import __version__ # noqa not included in __all__

self._synchronizer_url = config.synchronizer_url or (
config.remote_uri + self._GET_REPOSITORY_DATA_API)
self._event_url = config.event_url or (
config.remote_uri + self._POST_EVENTS_DATA_API)
self._realtime_url = config.realtime_url or (
config.remote_uri + self._REALTIME_EVENT_API)
self._sdk_key = sdk_key
self._refresh_interval = config.refresh_interval
self._location = config.location
Expand All @@ -45,6 +49,10 @@ def synchronizer_url(self):
def event_url(self):
return self._event_url

@property
def realtime_url(self):
return self._realtime_url

@property
def sdk_key(self):
return self._sdk_key
Expand Down
4 changes: 3 additions & 1 deletion featureprobe/file_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def from_context(
ready: "Event") -> "Synchronizer":
return cls(data_repo, context.location, ready)

def start(self):
self.sync()

def sync(self):
try:
with open(self._location, 'r', encoding='utf-8') as f:
Expand All @@ -61,4 +64,3 @@ def initialized(self):

def close(self):
self._ready.clear()
return
49 changes: 49 additions & 0 deletions featureprobe/internal/deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2022 FeatureProbe
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings


def _nameof(anything):
if callable(anything):
return "function<{}>".format(anything.__name__)
elif isinstance(anything, property):
return "property<{}>".format(anything.fget.__name__)
# TODO: add more type support
else:
return None


def deprecated(*, since: str = None, successor=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used in this pr, but it may be used someday, so keep it?

"""Marks functions as deprecated."""

def wrapper(func):
def inner(*args, **kwargs):
successor_name = _nameof(successor)
warnings.warn(
"{} is deprecated{}{}".format(
_nameof(func),
" since " + since if since is not None else "",
", consider using {} instead.".format(successor_name)
if successor_name is not None
else "",
),
category=DeprecationWarning,
stacklevel=2,
)
return func(*args, **kwargs)

return inner

return wrapper
68 changes: 59 additions & 9 deletions featureprobe/pooling_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import logging
import threading
from datetime import datetime
from typing import TYPE_CHECKING
from urllib.parse import urlparse

import tzlocal
from apscheduler.schedulers.background import BackgroundScheduler
from requests import Session

from featureprobe import Repository
from sys import version_info as python_version
if python_version >= (3, 6):
import socketio

from featureprobe.model import Repository
from featureprobe.realtime import RealtimeToggleUpdateNS
from featureprobe.synchronizer import Synchronizer

if TYPE_CHECKING:
Expand All @@ -37,6 +43,7 @@ def __init__(
context: "Context",
data_repo: "DataRepository",
ready: "threading.Event"):
self._context = context
self._refresh_interval = context.refresh_interval
self._api_url = context.synchronizer_url
self._data_repo = data_repo
Expand All @@ -49,6 +56,7 @@ def __init__(
context.http_config.conn_timeout,
context.http_config.read_timeout)

self._socket = None
self._scheduler = None
self._lock = threading.RLock()
self._ready = ready
Expand All @@ -61,31 +69,39 @@ def from_context(
ready: "threading.Event") -> "Synchronizer":
return cls(context, data_repo, ready)

def sync(self):
def start(self):
PoolingSynchronizer.__logger.info(
'Starting FeatureProbe polling repository with interval %d ms'
% (self._refresh_interval.total_seconds() * 1000))
self._poll()
self.sync()
with self._lock:
self._scheduler = BackgroundScheduler(
timezone=tzlocal.get_localzone(),
logger=self.__logger)
self._scheduler.start()
self._scheduler.add_job(
self._poll,
trigger='interval',
self._connect_socket,
args=(self._context,),
)
self._scheduler.add_job(
self.sync,
trigger="interval",
seconds=self._refresh_interval.total_seconds(),
next_run_time=datetime.now())
)

def close(self):
PoolingSynchronizer.__logger.info(
'Closing FeatureProbe PollingSynchronizer')
with self._lock:
self._scheduler.shutdown()
del self._scheduler
self._scheduler = None
self._ready.clear()
if self._socket is not None:
with contextlib.suppress(Exception):
self._socket.disconnect()
self._socket = None

def _poll(self):
def sync(self):
try:
resp = self._session.get(self._api_url, timeout=self._timeout)
resp.raise_for_status()
Expand All @@ -103,5 +119,39 @@ def _poll(self):
'Unexpected error from polling processor',
exc_info=e)

def _connect_socket(self, context: "Context"):
if self._socket is not None:
return

if python_version < (3, 6):
self.__logger.info(
'python version %s does not support socketio, realtime toggle updating is disabled'
% ('.'.join(map(str, python_version))))
return

path = urlparse(context.realtime_url).path
self._socket = socketio.Client()
self._socket.register_namespace(
RealtimeToggleUpdateNS(
path, context, self))

try:
self.__logger.info(
"connecting socket to {}, path={}".format(
context.realtime_url, path))
self._socket.connect(
context.realtime_url,
transports=["websocket"],
socketio_path=path,
wait=False,
wait_timeout=10,
)
except socketio.exceptions.ConnectionError as e:
self.__logger.error(
"failed to connect socket, realtime toggle updating is disabled",
exc_info=e,
)
self._socket = None

def initialized(self):
return self._ready.is_set()
56 changes: 56 additions & 0 deletions featureprobe/realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2022 FeatureProbe
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from sys import version_info as python_version
if python_version >= (3, 6):
from socketio import ClientNamespace
else:
class ClientNamespace:
def __init__(self, namespace=None):
...

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from featureprobe.context import Context
from featureprobe.synchronizer import Synchronizer


class RealtimeToggleUpdateNS(ClientNamespace):
__logger = logging.getLogger("FeatureProbe-Socket")

def __init__(
self,
namespace,
context: "Context",
synchronizer: "Synchronizer"):
super(RealtimeToggleUpdateNS, self).__init__(namespace=namespace)
self._synchronizer = synchronizer
self._sdk_key = context.sdk_key

def on_connect(self):
self.__logger.info("connect socketio success")
self.emit("register", {"key": self._sdk_key})

def on_connect_error(self, error):
self.__logger.error("socketio error: {}".format(error))

def on_disconnect(self):
self.__logger.info("disconnecting socketio")

def on_update(self, data):
self.__logger.info("socketio recv update event")
self._synchronizer.sync()
Loading