Skip to content

Add support for new TDIGEST features and changes #2392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add support to TDIGEST
  • Loading branch information
dvora-h committed Sep 20, 2022
commit 3d0a84de7c5d97e1344b5f6ff8ff9399b9da94d1
9 changes: 6 additions & 3 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from ..helpers import parse_to_list
from .commands import * # noqa
from .info import BFInfo, CFInfo, CMSInfo, TDigestInfo, TopKInfo
from .utils import parse_tdigest_quantile


class AbstractBloom(object):
Expand Down Expand Up @@ -166,12 +165,16 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
TDIGEST_CDF: float,
TDIGEST_QUANTILE: parse_tdigest_quantile,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}

self.client = client
Expand Down
64 changes: 49 additions & 15 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
TDIGEST_MAX = "TDIGEST.MAX"
TDIGEST_INFO = "TDIGEST.INFO"
TDIGEST_TRIMMED_MEAN = "TDIGEST.TRIMMED_MEAN"
TDIGEST_MERGESTORE = "TDIGEST.MERGESTORE"
TDIGEST_RANK = "TDIGEST.RANK"
TDIGEST_REVRANK = "TDIGEST.REVRANK"
TDIGEST_BYRANK = "TDIGEST.BYRANK"
TDIGEST_BYREVRANK = "TDIGEST.BYREVRANK"


class BFCommands:
Expand Down Expand Up @@ -374,12 +377,22 @@ def add(self, key, values, weights):
self.append_values_and_weights(params, values, weights)
return self.execute_command(TDIGEST_ADD, *params)

def merge(self, toKey, fromKey):
def merge(self, destination_key, num_keys, *keys, compression=None, override=False):
"""
Merge all of the values from 'fromKey' to 'toKey' sketch.
Merges all of the values from `keys` to 'destination-key' sketch.
It is mandatory to provide the `num_keys` before passing the input keys and
the other (optional) arguments.
If `destination_key` already exists its values are merged with the input keys.
If you wish to override the destination key contents use the `OVERRIDE` parameter.

For more information see `TDIGEST.MERGE <https://redis.io/commands/tdigest.merge>`_.
""" # noqa
return self.execute_command(TDIGEST_MERGE, toKey, fromKey)
params = [destination_key, num_keys, *keys]
if compression is not None:
params.extend(["COMPRESSION", compression])
if override:
params.append("OVERRIDE")
return self.execute_command(TDIGEST_MERGE, *params)

def min(self, key):
"""
Expand All @@ -404,12 +417,12 @@ def quantile(self, key, quantile, *quantiles):
""" # noqa
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)

def cdf(self, key, value):
def cdf(self, key, value, *values):
"""
Return double fraction of all points added which are <= value.
For more information see `TDIGEST.CDF <https://redis.io/commands/tdigest.cdf>`_.
""" # noqa
return self.execute_command(TDIGEST_CDF, key, value)
return self.execute_command(TDIGEST_CDF, key, value, *values)

def info(self, key):
"""
Expand All @@ -429,18 +442,39 @@ def trimmed_mean(self, key, low_cut_quantile, high_cut_quantile):
TDIGEST_TRIMMED_MEAN, key, low_cut_quantile, high_cut_quantile
)

def mergestore(self, dest_key, numkeys, *sourcekeys, compression=False):
def rank(self, key, value, *values):
"""
Merges all of the values from `sourcekeys` keys to `dest_key` sketch.
If destination already exists, it is overwritten.
Retrieve the estimated rank of value (the number of observations in the sketch
that are smaller than value + half the number of observations that are equal to value).

For more information see `TDIGEST.RANK <https://redis.io/commands/tdigest.rank>`_.
""" # noqa
return self.execute_command(TDIGEST_RANK, key, value, *values)

For more information see `TDIGEST.MERGESTORE <https://redis.io/commands/tdigest.mergestore>`_.
""" # noqa
params = [dest_key, numkeys, *sourcekeys]
if compression:
params.extend(["COMPRESSION", compression])
return self.execute_command(TDIGEST_MERGESTORE, *params)
def revrank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are larger than value + half the number of observations that are equal to value).

For more information see `TDIGEST.REVRANK <https://redis.io/commands/tdigest.revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_REVRANK, key, value, *values)

def byrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given rank.

For more information see `TDIGEST.BY_RANK <https://redis.io/commands/tdigest.by_rank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYRANK, key, rank, *ranks)

def byrevrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given reverse rank.

For more information see `TDIGEST.BY_REVRANK <https://redis.io/commands/tdigest.by_revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYREVRANK, key, rank, *ranks)


class CMSCommands:
Expand Down
24 changes: 14 additions & 10 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,22 @@ def __init__(self, args):
class TDigestInfo(object):
compression = None
capacity = None
mergedNodes = None
unmergedNodes = None
mergedWeight = None
unmergedWeight = None
totalCompressions = None
merged_nodes = None
unmerged_nodes = None
merged_weight = None
unmerged_weight = None
total_compressions = None
sum_weights = None
memory_usage = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.compression = response["Compression"]
self.capacity = response["Capacity"]
self.mergedNodes = response["Merged nodes"]
self.unmergedNodes = response["Unmerged nodes"]
self.mergedWeight = response["Merged weight"]
self.unmergedWeight = response["Unmerged weight"]
self.totalCompressions = response["Total compressions"]
self.merged_nodes = response["Merged nodes"]
self.unmerged_nodes = response["Unmerged nodes"]
self.merged_weight = response["Merged weight"]
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.sum_weights = response["Sum weights"]
self.memory_usage = response["Memory usage"]
3 changes: 0 additions & 3 deletions redis/commands/bf/utils.py

This file was deleted.

98 changes: 77 additions & 21 deletions tests/test_asyncio/test_bloom.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from math import inf

import redis.asyncio as redis
from redis.exceptions import ModuleError, RedisError
Expand Down Expand Up @@ -320,11 +321,11 @@ async def test_tdigest_reset(modclient: redis.Redis):
# reset on empty histogram
assert await modclient.tdigest().reset("tDigest")
# insert data-points into sketch
assert await modclient.tdigest().add("tDigest", list(range(10)), [1.0] * 10)
assert await modclient.tdigest().add("tDigest", list(range(10)), [1] * 10)

assert await modclient.tdigest().reset("tDigest")
# assert we have 0 unmerged nodes
assert 0 == (await modclient.tdigest().info("tDigest")).unmergedNodes
assert 0 == (await modclient.tdigest().info("tDigest")).unmerged_nodes


@pytest.mark.redismod
Expand All @@ -333,22 +334,32 @@ async def test_tdigest_merge(modclient: redis.Redis):
assert await modclient.tdigest().create("to-tDigest", 10)
assert await modclient.tdigest().create("from-tDigest", 10)
# insert data-points into sketch
assert await modclient.tdigest().add("from-tDigest", [1.0] * 10, [1.0] * 10)
assert await modclient.tdigest().add("to-tDigest", [2.0] * 10, [10.0] * 10)
assert await modclient.tdigest().add("from-tDigest", [1.0] * 10, [1] * 10)
assert await modclient.tdigest().add("to-tDigest", [2.0] * 10, [10] * 10)
# merge from-tdigest into to-tdigest
assert await modclient.tdigest().merge("to-tDigest", "from-tDigest")
assert await modclient.tdigest().merge("to-tDigest", 1, "from-tDigest")
# we should now have 110 weight on to-histogram
info = await modclient.tdigest().info("to-tDigest")
total_weight_to = float(info.mergedWeight) + float(info.unmergedWeight)
total_weight_to = float(info.merged_weight) + float(info.unmerged_weight)
assert 110 == total_weight_to
# test override
assert await modclient.tdigest().create("from-override", 10)
assert await modclient.tdigest().create("from-override-2", 10)
assert await modclient.tdigest().add("from-override", [3.0] * 10, [10] * 10)
assert await modclient.tdigest().add("from-override-2", [4.0] * 10, [10] * 10)
assert await modclient.tdigest().merge(
"to-tDigest", 2, "from-override", "from-override-2", override=True
)
assert 3.0 == await modclient.tdigest().min("to-tDigest")
assert 4.0 == await modclient.tdigest().max("to-tDigest")


@pytest.mark.redismod
@pytest.mark.experimental
async def test_tdigest_min_and_max(modclient: redis.Redis):
assert await modclient.tdigest().create("tDigest", 100)
# insert data-points into sketch
assert await modclient.tdigest().add("tDigest", [1, 2, 3], [1.0] * 3)
assert await modclient.tdigest().add("tDigest", [1, 2, 3], [1] * 3)
# min/max
assert 3 == await modclient.tdigest().max("tDigest")
assert 1 == await modclient.tdigest().min("tDigest")
Expand All @@ -361,12 +372,12 @@ async def test_tdigest_quantile(modclient: redis.Redis):
assert await modclient.tdigest().create("tDigest", 500)
# insert data-points into sketch
assert await modclient.tdigest().add(
"tDigest", list([x * 0.01 for x in range(1, 10000)]), [1.0] * 10000
"tDigest", list([x * 0.01 for x in range(1, 10000)]), [1] * 10000
)
# assert min min/max have same result as quantile 0 and 1
assert (
await modclient.tdigest().max("tDigest")
== (await modclient.tdigest().quantile("tDigest", 1.0))[0]
== (await modclient.tdigest().quantile("tDigest", 1))[0]
)
assert (
await modclient.tdigest().min("tDigest")
Expand All @@ -378,7 +389,7 @@ async def test_tdigest_quantile(modclient: redis.Redis):

# test multiple quantiles
assert await modclient.tdigest().create("t-digest", 100)
assert await modclient.tdigest().add("t-digest", [1, 2, 3, 4, 5], [1.0] * 5)
assert await modclient.tdigest().add("t-digest", [1, 2, 3, 4, 5], [1] * 5)
res = await modclient.tdigest().quantile("t-digest", 0.5, 0.8)
assert [3.0, 5.0] == res

Expand All @@ -388,22 +399,67 @@ async def test_tdigest_quantile(modclient: redis.Redis):
async def test_tdigest_cdf(modclient: redis.Redis):
assert await modclient.tdigest().create("tDigest", 100)
# insert data-points into sketch
assert await modclient.tdigest().add("tDigest", list(range(1, 10)), [1.0] * 10)
assert 0.1 == round(await modclient.tdigest().cdf("tDigest", 1.0), 1)
assert 0.9 == round(await modclient.tdigest().cdf("tDigest", 9.0), 1)
assert await modclient.tdigest().add("tDigest", list(range(1, 10)), [1] * 10)
assert 0.1 == round((await modclient.tdigest().cdf("tDigest", 1.0))[0], 1)
assert 0.9 == round((await modclient.tdigest().cdf("tDigest", 9.0))[0], 1)
res = await modclient.tdigest().cdf("tDigest", 1.0, 9.0)
assert [0.1, 0.9] == [round(x, 1) for x in res]


@pytest.mark.redismod
@pytest.mark.experimental
@skip_ifmodversion_lt("2.4.0", "bf")
async def test_tdigest_mergestore(modclient: redis.Redis):
assert await modclient.tdigest().create("sourcekey1", 100)
assert await modclient.tdigest().create("sourcekey2", 100)
assert await modclient.tdigest().add("sourcekey1", [10], [1.0])
assert await modclient.tdigest().add("sourcekey2", [50], [1.0])
assert await modclient.tdigest().mergestore("dest", 2, "sourcekey1", "sourcekey2")
assert await modclient.tdigest().max("dest") == 50
assert await modclient.tdigest().min("dest") == 10
async def test_tdigest_trimmed_mean(modclient: redis.Redis):
assert await modclient.tdigest().create("tDigest", 100)
# insert data-points into sketch
assert await modclient.tdigest().add("tDigest", list(range(1, 10)), [1] * 10)
assert 5 == await modclient.tdigest().trimmed_mean("tDigest", 0.1, 0.9)
assert 4.5 == await modclient.tdigest().trimmed_mean("tDigest", 0.4, 0.5)


@pytest.mark.redismod
@pytest.mark.experimental
async def test_tdigest_rank(modclient: redis.Redis):
assert await modclient.tdigest().create("t-digest", 500)
assert await modclient.tdigest().add("t-digest", list(range(0, 20)), [1] * 20)
assert -1 == (await modclient.tdigest().rank("t-digest", -1))[0]
assert 1 == (await modclient.tdigest().rank("t-digest", 0))[0]
assert 11 == (await modclient.tdigest().rank("t-digest", 10))[0]
assert [-1, 20, 10] == await modclient.tdigest().rank("t-digest", -20, 20, 9)


@pytest.mark.redismod
@pytest.mark.experimental
async def test_tdigest_revrank(modclient: redis.Redis):
assert await modclient.tdigest().create("t-digest", 500)
assert await modclient.tdigest().add("t-digest", list(range(0, 20)), [1] * 20)
assert -1 == (await modclient.tdigest().revrank("t-digest", 20))[0]
assert 20 == (await modclient.tdigest().revrank("t-digest", 0))[0]
assert [-1, 20, 10] == await modclient.tdigest().revrank("t-digest", 21, 0, 10)


@pytest.mark.redismod
@pytest.mark.experimental
async def test_tdigest_byrank(modclient: redis.Redis):
assert await modclient.tdigest().create("t-digest", 500)
assert await modclient.tdigest().add("t-digest", list(range(1, 11)), [1] * 20)
assert 1 == (await modclient.tdigest().byrank("t-digest", 0))[0]
assert 10 == (await modclient.tdigest().byrank("t-digest", 9))[0]
assert (await modclient.tdigest().byrank("t-digest", 100))[0] == inf
with pytest.raises(redis.ResponseError):
(await modclient.tdigest().byrank("t-digest", -1))[0]


@pytest.mark.redismod
@pytest.mark.experimental
async def test_tdigest_byrevrank(modclient: redis.Redis):
assert await modclient.tdigest().create("t-digest", 500)
assert await modclient.tdigest().add("t-digest", list(range(1, 11)), [1] * 20)
assert 10 == (await modclient.tdigest().byrevrank("t-digest", 0))[0]
assert 2 == (await modclient.tdigest().byrevrank("t-digest", 9))[0]
assert (await modclient.tdigest().byrevrank("t-digest", 100))[0] == -inf
with pytest.raises(redis.ResponseError):
(await modclient.tdigest().byrevrank("t-digest", -1))[0]


# @pytest.mark.redismod
Expand Down
Loading