Skip to content

updated redis, pluggable and localjson storages #561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions splitio/models/rule_based_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
class RuleBasedSegment(object):
"""RuleBasedSegment object class."""

def __init__(self, name, traffic_yype_Name, change_number, status, conditions, excluded):
def __init__(self, name, traffic_type_name, change_number, status, conditions, excluded):
"""
Class constructor.

:param name: Segment name.
:type name: str
:param traffic_yype_Name: traffic type name.
:type traffic_yype_Name: str
:param traffic_type_name: traffic type name.
:type traffic_type_name: str
:param change_number: change number.
:type change_number: str
:param status: status.
Expand All @@ -29,7 +29,7 @@ def __init__(self, name, traffic_yype_Name, change_number, status, conditions, e
:type excluded: Excluded
"""
self._name = name
self._traffic_yype_Name = traffic_yype_Name
self._traffic_type_name = traffic_type_name
self._change_number = change_number
self._status = status
self._conditions = conditions
Expand All @@ -41,9 +41,9 @@ def name(self):
return self._name

@property
def traffic_yype_Name(self):
def traffic_type_name(self):
"""Return traffic type name."""
return self._traffic_yype_Name
return self._traffic_type_name

@property
def change_number(self):
Expand All @@ -65,6 +65,17 @@ def excluded(self):
"""Return excluded."""
return self._excluded

def to_json(self):
"""Return a JSON representation of this rule based segment."""
return {
'changeNumber': self.change_number,
'trafficTypeName': self.traffic_type_name,
'name': self.name,
'status': self.status,
'conditions': [c.to_json() for c in self.conditions],
'excluded': self.excluded.to_json()
}

def from_raw(raw_rule_based_segment):
"""
Parse a Rule based segment from a JSON portion of splitChanges.
Expand Down Expand Up @@ -111,3 +122,10 @@ def get_excluded_keys(self):
def get_excluded_segments(self):
"""Return excluded segments"""
return self._segments

def to_json(self):
"""Return a JSON representation of this object."""
return {
'keys': self._keys,
'segments': self._segments
}
3 changes: 2 additions & 1 deletion splitio/storage/inmemmory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def remove_flag_set(self, flag_sets, feature_flag_name, should_filter):

class InMemoryRuleBasedSegmentStorage(RuleBasedSegmentsStorage):
"""InMemory implementation of a feature flag storage base."""

def __init__(self):
"""Constructor."""
self._lock = threading.RLock()
Expand Down Expand Up @@ -192,7 +193,7 @@ def _set_change_number(self, new_change_number):

def get_segment_names(self):
"""
Retrieve a list of all excluded segments names.
Retrieve a list of all rule based segments names.

:return: List of segment names.
:rtype: list(str)
Expand Down
248 changes: 245 additions & 3 deletions splitio/storage/pluggable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,257 @@
import threading

from splitio.optional.loaders import asyncio
from splitio.models import splits, segments
from splitio.models import splits, segments, rule_based_segments
from splitio.models.impressions import Impression
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS,\
MethodLatenciesAsync, MethodExceptionsAsync, TelemetryConfigAsync
from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage
from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage, RuleBasedSegmentsStorage
from splitio.util.storage_helper import get_valid_flag_sets, combine_valid_flag_sets

_LOGGER = logging.getLogger(__name__)

class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage):
"""Pluggable storage for rule based segments."""

_TILL_LENGTH = 4

def __init__(self, pluggable_adapter, prefix=None):
"""
Class constructor.

:param redis_client: Redis client or compliant interface.
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
"""
self._pluggable_adapter = pluggable_adapter
self._prefix = "SPLITIO.rbsegment.{segment_name}"
self._rb_segments_till_prefix = "SPLITIO.rbsegments.till"
self._rb_segment_name_length = 18
if prefix is not None:
self._rb_segment_name_length += len(prefix) + 1
self._prefix = prefix + "." + self._prefix
self._rb_segments_till_prefix = prefix + "." + self._rb_segments_till_prefix

def get(self, segment_name):
"""
Retrieve a rule based segment.

:param segment_name: Name of the segment to fetch.
:type segment_name: str

:rtype: str
"""
pass

def get_change_number(self):
"""
Retrieve latest rule based segment change number.

:rtype: int
"""
pass

def contains(self, segment_names):
"""
Return whether the segments exists in rule based segment in cache.

:param segment_names: segment name to validate.
:type segment_names: str

:return: True if segment names exists. False otherwise.
:rtype: bool
"""
pass

def get_segment_names(self):
"""
Retrieve a list of all excluded segments names.

:return: List of segment names.
:rtype: list(str)
"""
pass

def update(self, to_add, to_delete, new_change_number):
"""
Update rule based segment..

:param to_add: List of rule based segment. to add
:type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment]
:param to_delete: List of rule based segment. to delete
:type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment]
:param new_change_number: New change number.
:type new_change_number: int
"""
raise NotImplementedError('Only redis-consumer mode is supported.')

def get_large_segment_names(self):
"""
Retrieve a list of all excluded large segments names.

:return: List of segment names.
:rtype: list(str)
"""
pass

class PluggableRuleBasedSegmentsStorage(PluggableRuleBasedSegmentsStorageBase):
"""Pluggable storage for rule based segments."""

def __init__(self, pluggable_adapter, prefix=None):
"""
Class constructor.

:param redis_client: Redis client or compliant interface.
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
"""
PluggableRuleBasedSegmentsStorageBase.__init__(self, pluggable_adapter, prefix)

def get(self, segment_name):
"""
Retrieve a rule based segment.

:param segment_name: Name of the segment to fetch.
:type segment_name: str

:rtype: str
"""
try:
rb_segment = self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name))
if not rb_segment:
return None

return rule_based_segments.from_raw(rb_segment)

except Exception:
_LOGGER.error('Error getting rule based segment from storage')
_LOGGER.debug('Error: ', exc_info=True)
return None

def get_change_number(self):
"""
Retrieve latest rule based segment change number.

:rtype: int
"""
try:
return self._pluggable_adapter.get(self._rb_segments_till_prefix)

except Exception:
_LOGGER.error('Error getting change number in rule based segment storage')
_LOGGER.debug('Error: ', exc_info=True)
return None

def contains(self, segment_names):
"""
Return whether the segments exists in rule based segment in cache.

:param segment_names: segment name to validate.
:type segment_names: str

:return: True if segment names exists. False otherwise.
:rtype: bool
"""
return set(segment_names).issubset(self.get_segment_names())

def get_segment_names(self):
"""
Retrieve a list of all rule based segments names.

:return: List of segment names.
:rtype: list(str)
"""
try:
_LOGGER.error(self._rb_segment_name_length)
_LOGGER.error(self._prefix)
_LOGGER.error(self._prefix[:self._rb_segment_name_length])
keys = []
for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:self._rb_segment_name_length]):
if key[-self._TILL_LENGTH:] != 'till':
keys.append(key[len(self._prefix[:self._rb_segment_name_length]):])
return keys

except Exception:
_LOGGER.error('Error getting rule based segments names from storage')
_LOGGER.debug('Error: ', exc_info=True)
return None

class PluggableRuleBasedSegmentsStorageAsync(PluggableRuleBasedSegmentsStorageBase):
"""Pluggable storage for rule based segments."""

def __init__(self, pluggable_adapter, prefix=None):
"""
Class constructor.

:param redis_client: Redis client or compliant interface.
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
"""
PluggableRuleBasedSegmentsStorageBase.__init__(self, pluggable_adapter, prefix)

async def get(self, segment_name):
"""
Retrieve a rule based segment.

:param segment_name: Name of the segment to fetch.
:type segment_name: str

:rtype: str
"""
try:
rb_segment = await self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name))
if not rb_segment:
return None

return rule_based_segments.from_raw(rb_segment)

except Exception:
_LOGGER.error('Error getting rule based segment from storage')
_LOGGER.debug('Error: ', exc_info=True)
return None

async def get_change_number(self):
"""
Retrieve latest rule based segment change number.

:rtype: int
"""
try:
return await self._pluggable_adapter.get(self._rb_segments_till_prefix)

except Exception:
_LOGGER.error('Error getting change number in rule based segment storage')
_LOGGER.debug('Error: ', exc_info=True)
return None

async def contains(self, segment_names):
"""
Return whether the segments exists in rule based segment in cache.

:param segment_names: segment name to validate.
:type segment_names: str

:return: True if segment names exists. False otherwise.
:rtype: bool
"""
return set(segment_names).issubset(await self.get_segment_names())

async def get_segment_names(self):
"""
Retrieve a list of all rule based segments names.

:return: List of segment names.
:rtype: list(str)
"""
try:
keys = []
for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:self._rb_segment_name_length]):
if key[-self._TILL_LENGTH:] != 'till':
keys.append(key[len(self._prefix[:self._rb_segment_name_length]):])
return keys

except Exception:
_LOGGER.error('Error getting rule based segments names from storage')
_LOGGER.debug('Error: ', exc_info=True)
return None

class PluggableSplitStorageBase(SplitStorage):
"""InMemory implementation of a feature flag storage."""

Expand Down Expand Up @@ -90,7 +332,7 @@ def update(self, to_add, to_delete, new_change_number):
:param new_change_number: New change number.
:type new_change_number: int
"""
# pass
pass
# try:
# split = self.get(feature_flag_name)
# if not split:
Expand Down
Loading