Skip to content
This repository was archived by the owner on Jan 24, 2023. It is now read-only.

Added support for TS.REVRANGE and TS.MREVRANGE #62

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,42 @@ jobs:
. venv/bin/activate
codecov

build_nightly:
build_latest:
docker:
- image: circleci/python:3.7.1
- image: redislabs/redistimeseries:latest

working_directory: ~/repo

steps:
- checkout

- restore_cache: # Download and cache dependencies
keys:
- v1-dependencies-{{ checksum "requirements.txt" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-

- run:
name: install dependencies
command: |
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
pip install codecov

- save_cache:
paths:
- ./venv
key: v1-dependencies-{{ checksum "requirements.txt" }}

- run:
name: run tests
command: |
. venv/bin/activate
REDIS_PORT=6379 python test_commands.py

build_edge:
docker:
- image: circleci/python:3.7.1
- image: redislabs/redistimeseries:edge
Expand Down Expand Up @@ -108,6 +143,7 @@ workflows:
commit:
jobs:
- build
- build_latest
nightly:
triggers:
- schedule:
Expand All @@ -117,4 +153,5 @@ workflows:
only:
- master
jobs:
- build_nightly
- build_edge
- build_latest
97 changes: 81 additions & 16 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ class Client(object): #changed from StrictRedis
CREATERULE_CMD = 'TS.CREATERULE'
DELETERULE_CMD = 'TS.DELETERULE'
RANGE_CMD = 'TS.RANGE'
REVRANGE_CMD = 'TS.REVRANGE'
MRANGE_CMD = 'TS.MRANGE'
MREVRANGE_CMD = 'TS.MREVRANGE'
GET_CMD = 'TS.GET'
MGET_CMD = 'TS.MGET'
INFO_CMD = 'TS.INFO'
Expand All @@ -107,7 +109,9 @@ def __init__(self, conn=None, *args, **kwargs):
self.CREATERULE_CMD : bool_ok,
self.DELETERULE_CMD : bool_ok,
self.RANGE_CMD : parse_range,
self.REVRANGE_CMD: parse_range,
self.MRANGE_CMD : parse_m_range,
self.MREVRANGE_CMD: parse_m_range,
self.GET_CMD : parse_get,
self.MGET_CMD : parse_m_get,
self.INFO_CMD : TSInfo,
Expand Down Expand Up @@ -254,33 +258,56 @@ def createrule(self, source_key, dest_key,
def deleterule(self, source_key, dest_key):
"""Deletes a compaction rule"""
return self.redis.execute_command(self.DELETERULE_CMD, source_key, dest_key)

def range(self, key, from_time, to_time, count=None,
aggregation_type=None, bucket_size_msec=0):

def __range_params(self, key, from_time, to_time, count, aggregation_type, bucket_size_msec):
"""
Query a range from ``key``, from ``from_time`` to ``to_time``.
``count`` limits the number of results.
Can Aggregate for ``bucket_size_msec`` where an ``aggregation_type``
can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
Internal method to create TS.RANGE and TS.REVRANGE arguments
"""
params = [key, from_time, to_time]
self.appendCount(params, count)
if aggregation_type is not None:
self.appendAggregation(params, aggregation_type, bucket_size_msec)
return params

def range(self, key, from_time, to_time, count=None,
aggregation_type=None, bucket_size_msec=0):
"""
Query a range in forward direction for a specific time-serie.

Args:
key: Key name for timeseries.
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
bucket_size_msec: Time bucket for aggregation in milliseconds.
"""
params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec)
return self.redis.execute_command(self.RANGE_CMD, *params)

def mrange(self, from_time, to_time, filters, count=None,
aggregation_type=None, bucket_size_msec=0, with_labels=False):
def revrange(self, key, from_time, to_time, count=None,
aggregation_type=None, bucket_size_msec=0):
"""
Query a range based on filters,retention_msecs from ``from_time`` to ``to_time``.
``count`` limits the number of results.
``filters`` are a list strings such as ['Test=This'].
Can Aggregate for ``bucket_size_msec`` where an ``aggregation_type``
can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
Query a range in reverse direction for a specific time-serie.
Note: This command is only available since RedisTimeSeries >= v1.4

Args:
key: Key name for timeseries.
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
``WITHLABELS`` appends labels to results.
bucket_size_msec: Time bucket for aggregation in milliseconds.
"""
params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec)
return self.redis.execute_command(self.REVRANGE_CMD, *params)


def __mrange_params(self, aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels):
"""
Internal method to create TS.MRANGE and TS.MREVRANGE arguments
"""
params = [from_time, to_time]
self.appendCount(params, count)
Expand All @@ -289,8 +316,46 @@ def mrange(self, from_time, to_time, filters, count=None,
self.appendWithLabels(params, with_labels)
params.extend(['FILTER'])
params += filters
return params

def mrange(self, from_time, to_time, filters, count=None,
aggregation_type=None, bucket_size_msec=0, with_labels=False):
"""
Query a range across multiple time-series by filters in forward direction.

Args:
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
filters: filter to match the time-series labels.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
bucket_size_msec: Time bucket for aggregation in milliseconds.
with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series.
If this argument is not set, by default, an empty Array will be replied on the labels array position.
"""
params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels)
return self.redis.execute_command(self.MRANGE_CMD, *params)

def mrevrange(self, from_time, to_time, filters, count=None,
aggregation_type=None, bucket_size_msec=0, with_labels=False):
"""
Query a range across multiple time-series by filters in reverse direction.

Args:
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
filters: filter to match the time-series labels.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
bucket_size_msec: Time bucket for aggregation in milliseconds.
with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series.
If this argument is not set, by default, an empty Array will be replied on the labels array position.
"""
params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels)
return self.redis.execute_command(self.MREVRANGE_CMD, *params)

def get(self, key):
"""Gets the last sample of ``key``"""
return self.redis.execute_command(self.GET_CMD, key)
Expand Down
54 changes: 54 additions & 0 deletions test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
from redistimeseries.client import Client as RedisTimeSeries
from redis import Redis

version = None
rts = None
port = 6379

class RedisTimeSeriesTest(TestCase):
def setUp(self):
global rts
global version
rts = RedisTimeSeries(port=port)
rts.redis.flushdb()
modules = rts.redis.execute_command("module","list")
if modules is not None:
for module_info in modules:
if module_info[1] == b'timeseries':
version = int(module_info[3])

def testCreate(self):
'''Test TS.CREATE calls'''
Expand Down Expand Up @@ -111,6 +118,22 @@ def testRange(self):
self.assertEqual(20, len(rts.range(1, 0, 500, aggregation_type='avg', bucket_size_msec=10)))
self.assertEqual(10, len(rts.range(1, 0, 500, count=10)))

def testRevRange(self):
'''Test TS.REVRANGE calls which returns reverse range by key'''
# TS.REVRANGE is available since RedisTimeSeries >= v1.4
if version is None or version < 14000:
return

for i in range(100):
rts.add(1, i, i % 7)
self.assertEqual(100, len(rts.range(1, 0, 200)))
for i in range(100):
rts.add(1, i+200, i % 7)
self.assertEqual(200, len(rts.range(1, 0, 500)))
#first sample isn't returned
self.assertEqual(20, len(rts.revrange(1, 0, 500, aggregation_type='avg', bucket_size_msec=10)))
self.assertEqual(10, len(rts.revrange(1, 0, 500, count=10)))

def testMultiRange(self):
'''Test TS.MRANGE calls which returns range by filter'''

Expand Down Expand Up @@ -139,6 +162,37 @@ def testMultiRange(self):
res = rts.mrange(0, 200, filters=['Test=This'], with_labels=True)
self.assertEqual({'Test': 'This'}, res[0]['1'][0])

def testMultiReverseRange(self):
'''Test TS.MREVRANGE calls which returns range by filter'''
# TS.MREVRANGE is available since RedisTimeSeries >= v1.4
if version is None or version < 14000:
return

rts.create(1, labels={'Test': 'This'})
rts.create(2, labels={'Test': 'This', 'Taste': 'That'})
for i in range(100):
rts.add(1, i, i % 7)
rts.add(2, i, i % 11)

res = rts.mrange(0, 200, filters=['Test=This'])
self.assertEqual(2, len(res))
self.assertEqual(100, len(res[0]['1'][1]))

res = rts.mrange(0, 200, filters=['Test=This'], count=10)
self.assertEqual(10, len(res[0]['1'][1]))

for i in range(100):
rts.add(1, i + 200, i % 7)
res = rts.mrevrange(0, 500, filters=['Test=This'],
aggregation_type='avg', bucket_size_msec=10)
self.assertEqual(2, len(res))
self.assertEqual(20, len(res[0]['1'][1]))

# test withlabels
self.assertEqual({}, res[0]['1'][0])
res = rts.mrevrange(0, 200, filters=['Test=This'], with_labels=True)
self.assertEqual({'Test': 'This'}, res[0]['1'][0])

def testGet(self):
'''Test TS.GET calls'''

Expand Down