Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dc1bb1a
added command class for quantile command
SharonIV0x86 Mar 23, 2025
a2e99dc
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Mar 24, 2025
7bc9b81
Implemented QUANTILE command
SharonIV0x86 Mar 30, 2025
a33b2cf
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Mar 30, 2025
562798d
Merge branch 'unstable' into feat/quantile-command
PragmaTwice Apr 9, 2025
6e0a7f0
Merge branch 'unstable' into feat/quantile-command
mapleFU Apr 13, 2025
64ee2e7
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 16, 2025
aa04e94
Tested working of TDIGEST.QUANTILE locally.
SharonIV0x86 Apr 16, 2025
185898f
Added go test case for quantile command
SharonIV0x86 Apr 16, 2025
359f151
Fixed return status for non existent key
SharonIV0x86 Apr 16, 2025
63c34f3
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 17, 2025
7937777
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 19, 2025
42e4667
Merge branch 'unstable' into feat/quantile-command
PragmaTwice Apr 20, 2025
d77f5d6
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 21, 2025
b1b951b
Made suggested changes
SharonIV0x86 Apr 21, 2025
6d8f169
Made suggested changes
SharonIV0x86 Apr 21, 2025
d42bea1
Made go linter happy
SharonIV0x86 Apr 21, 2025
e1a6796
Merge branch 'unstable' into feat/quantile-command
PragmaTwice Apr 21, 2025
a51b5d4
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 22, 2025
e21933d
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 23, 2025
84ad11b
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 25, 2025
a302c79
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 26, 2025
cb2e4d5
Merge branch 'unstable' into feat/quantile-command
aleksraiden Apr 27, 2025
82c1cac
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 Apr 30, 2025
dcb88fb
Update src/commands/cmd_tdigest.cc
SharonIV0x86 May 2, 2025
17cda79
made suggested changes. 1.) Improved tests 2.) Added check for empty …
SharonIV0x86 May 3, 2025
0ecaf4e
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 May 6, 2025
8e71585
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 May 7, 2025
30538a9
made suggested check for empty digest and nan response
SharonIV0x86 May 7, 2025
ad75911
Merge branch 'unstable' into feat/quantile-command
PragmaTwice May 8, 2025
3127abb
fixed clang-tidy error: used emplace_back instead of push_back
SharonIV0x86 May 8, 2025
f6c3cb5
gocase for unordered quantiles
SharonIV0x86 May 8, 2025
323d71b
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 May 8, 2025
f362094
added 'nan' as a constexpr literal string in cmd_tdigest.cc
SharonIV0x86 May 8, 2025
b4dbb7a
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 May 8, 2025
354f9f3
check for empty tdigest
SharonIV0x86 May 9, 2025
4defcd8
added cpp unit test case for empty tdigest
SharonIV0x86 May 9, 2025
ad188f5
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 May 9, 2025
aba6a5c
Merge branch 'unstable' into feat/quantile-command
SharonIV0x86 May 16, 2025
f095c3b
added an early check inside lock for empty tdigest
SharonIV0x86 May 16, 2025
249564c
add `nan` as response for empty tdigest
LindaSummer May 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
*
*/

#include <range/v3/range/conversion.hpp>
#include <range/v3/view/transform.hpp>

#include "command_parser.h"
#include "commander.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "status.h"
#include "string_util.h"
#include "types/redis_tdigest.h"

namespace redis {
Expand All @@ -37,6 +41,7 @@ constexpr auto kInfoMergedWeight = "Merged weight";
constexpr auto kInfoUnmergedWeight = "Unmerged weight";
constexpr auto kInfoObservations = "Observations";
constexpr auto kInfoTotalCompressions = "Total compressions";
constexpr auto kNan = "nan";
} // namespace

class CommandTDigestCreate : public Commander {
Expand Down Expand Up @@ -242,11 +247,45 @@ class CommandTDigestMax : public CommandTDigestMinMax {
public:
CommandTDigestMax() : CommandTDigestMinMax(false) {}
};
class CommandTDigestQuantile : public Commander {
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];
quantiles_.reserve(args.size() - 2);
for (size_t i = 2; i < args.size(); i++) {
auto value = ParseFloat(args[i]);
if (!value) {
return {Status::RedisParseErr, errValueIsNotFloat};
}
quantiles_.push_back(*value);
}
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
TDigestQuantitleResult result;
auto s = tdigest.Quantile(ctx, key_name_, quantiles_, &result);
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}
auto quantile_strings = result.quantiles
? (ranges::views::transform(*result.quantiles, util::Float2String) | ranges::to_vector)
: std::vector<std::string>(quantiles_.size(), kNan);
*output = conn->MultiBulkString(quantile_strings);
return Status::OK();
}

private:
std::string key_name_;
std::vector<double> quantiles_;
};
REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1));
} // namespace redis
10 changes: 9 additions & 1 deletion src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
return status;
}

if (metadata.total_observations == 0) {
return rocksdb::Status::OK();
}

if (metadata.unmerged_nodes > 0) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
Expand Down Expand Up @@ -228,13 +232,17 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name

auto dump_centroids = DummyCentroids(metadata, centroids);

auto quantile_results = std::vector<double>();
quantile_results.reserve(qs.size());

for (auto q : qs) {
auto status_or_value = TDigestQuantile(dump_centroids, q);
if (!status_or_value) {
return rocksdb::Status::InvalidArgument(status_or_value.Msg());
}
result->quantiles.push_back(*status_or_value);
quantile_results.push_back(*status_or_value);
}
result->quantiles = std::move(quantile_results);

return rocksdb::Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <rocksdb/slice.h>
#include <rocksdb/status.h>

#include <optional>
#include <vector>

#include "storage/redis_db.h"
Expand All @@ -44,7 +45,7 @@ struct TDigestCreateOptions {
};

struct TDigestQuantitleResult {
std::vector<double> quantiles;
std::optional<std::vector<double>> quantiles;
};

class TDigest : public SubKeyScanner {
Expand Down
35 changes: 27 additions & 8 deletions tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ TEST_F(RedisTDigestTest, Quantile) {
redis::TDigestQuantitleResult result;
status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(result.quantiles.size(), qs.size());
EXPECT_NEAR(result.quantiles[0], 50.5, 0.01);
EXPECT_NEAR(result.quantiles[1], 90.5, 0.01);
EXPECT_NEAR(result.quantiles[2], 100, 0.01);
ASSERT_TRUE(result.quantiles);
ASSERT_EQ(result.quantiles->size(), qs.size());
EXPECT_NEAR((*result.quantiles)[0], 50.5, 0.01);
EXPECT_NEAR((*result.quantiles)[1], 90.5, 0.01);
EXPECT_NEAR((*result.quantiles)[2], 100, 0.01);
}

TEST_F(RedisTDigestTest, PlentyQuantile_10000_144) {
Expand All @@ -201,9 +202,10 @@ TEST_F(RedisTDigestTest, PlentyQuantile_10000_144) {
redis::TDigestQuantitleResult tdigest_result;
status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &tdigest_result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(tdigest_result.quantiles);

for (int i = 0; i < quantile_count; i++) {
EXPECT_NEAR(tdigest_result.quantiles[i], result[i], error_double) << "quantile is: " << qs[i];
EXPECT_NEAR((*tdigest_result.quantiles)[i], result[i], error_double) << "quantile is: " << qs[i];
}
}

Expand Down Expand Up @@ -234,10 +236,11 @@ TEST_F(RedisTDigestTest, Add_2_times) {
redis::TDigestQuantitleResult tdigest_result;
status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &tdigest_result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(tdigest_result.quantiles);

for (int i = 0; i < quantile_count; i++) {
auto &[expect_down, expect_upper] = expect_result[i];
auto got = tdigest_result.quantiles[i];
auto got = (*tdigest_result.quantiles)[i];
EXPECT_GE(got, expect_down) << fmt::format("quantile is {}, should in interval [{}, {}]", qs[i], expect_down,
expect_upper);
EXPECT_LE(got, expect_upper) << fmt::format("quantile is {}, should in interval [{}, {}]", qs[i], expect_down,
Expand Down Expand Up @@ -267,15 +270,31 @@ TEST_F(RedisTDigestTest, Add_100_times_same_value) {
redis::TDigestQuantitleResult tdigest_result;
status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &tdigest_result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(tdigest_result.quantiles);

auto expect_result = std::vector<double>{
-10, -9, -8, -5, 1, 7, 10, 11, 12,
};

EXPECT_EQ(tdigest_result.quantiles.size(), qs.size());
EXPECT_EQ(tdigest_result.quantiles->size(), qs.size());

for (size_t i = 0; i < qs.size(); i++) {
auto got = tdigest_result.quantiles[i];
auto got = (*tdigest_result.quantiles)[i];
EXPECT_NEAR(got, expect_result[i], 0.5) << fmt::format("quantile is {}, should be {}", qs[i], expect_result[i]);
}
}
TEST_F(RedisTDigestTest, Quantile_returns_nan_on_empty_tdigest) {
std::string test_digest_name = "test_digest_nan" + std::to_string(util::GetTimeStampMS());

bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> qs = {0.3, 0.1, 0.2, 0.56, 0.44, 0.12, 0.11};
redis::TDigestQuantitleResult result;

status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.quantiles) << "should not have quantiles with empty tdigest";
}
105 changes: 105 additions & 0 deletions tests/gocase/unit/type/tdigest/tdigest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package tdigest

import (
"context"
"strconv"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
Expand Down Expand Up @@ -310,4 +311,108 @@ func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
infoAfterEmptyReset := toTdigestInfo(t, rsp.Val())
require.EqualValues(t, 100, infoAfterEmptyReset.Compression)
})
t.Run("tdigest.quantile with different arguments", func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SharonIV0x86 ,

We could also add an unordered sequence and an empty tdigest as cases. 😊

Best Regards,
Edward

keyPrefix := "t_qt_"

// No arguments
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.QUANTILE").Err(), errMsgWrongNumberArg)

// Non-existent key
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.QUANTILE", keyPrefix+"iDoNotExist", "0.5").Err(), errMsgKeyNotExist)

{
// Quantile on empry tdigest
key0 := keyPrefix + "00"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key0, "compression", "100").Err())
rsp := rdb.Do(ctx, "TDIGEST.QUANTILE", key0, "0.4", "0.1", "0.2", "0.3", "0.7", "0.5")
require.NoError(t, rsp.Err())
vals, err := rsp.Slice()
require.NoError(t, err)
require.Equal(t, 6, len(vals))
for _, v := range vals {
s, ok := v.(string)
require.True(t, ok, "Expected string but got %T", v)
require.Equal(t, "nan", s, "Expected value to be 'nan'")
}
}
{
// Quantiles on positive data
key1 := keyPrefix + "01"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key1, "compression", "100").Err())
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key1, "1", "2", "2", "3", "3", "3", "4", "4", "4", "4", "5", "5", "5", "5", "5").Err())
rsp := rdb.Do(ctx, "TDIGEST.QUANTILE", key1, "0", "0.1", "0.2", "0.3", "0.4", "0.5", "0.6", "0.7", "0.8", "0.9", "1")
require.NoError(t, rsp.Err())
vals, err := rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 11)
expected := []float64{1.0, 2.0, 2.5, 3.0, 3.5, 4.0, 4.0, 5.0, 5.0, 5.0, 5.0}
for i, v := range vals {
str, ok := v.(string)
require.True(t, ok, "expected string but got %T at index %d", v, i)

got, err := strconv.ParseFloat(str, 64)
require.NoError(t, err, "could not parse value at index %d", i)

require.InEpsilon(t, expected[i], got, 0.0001, "mismatch at index %d", i)
}
}
{
// Quantiles on negative data
key2 := keyPrefix + "02"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key2, "compression", "100").Err())
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key2, "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-9", "-10").Err())
rsp := rdb.Do(ctx, "TDIGEST.QUANTILE", key2, "0", "0.25", "0.5", "0.75", "1")
require.NoError(t, rsp.Err())

vals, err := rsp.Slice()
require.NoError(t, err)
require.Len(t, vals, 5)

expected := []float64{-10.0, -8.0, -5.5, -3.0, -1.0}
for i, v := range vals {
str, ok := v.(string)
require.True(t, ok, "expected string but got %T at index %d", v, i)

got, err := strconv.ParseFloat(str, 64)
require.NoError(t, err, "could not parse value at index %d", i)

require.InEpsilon(t, expected[i], got, 0.0001, "mismatch at index %d", i)
}
}
{
// Query with unordered quantiles
key3 := keyPrefix + "03"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key3, "compression", "100").Err())
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key3,
"3", "12", "-3", "-19", "13", "4", "14", "18", "-1", "-5", "15", "-10", "33", "17", "-20",
).Err())
rsp := rdb.Do(ctx, "TDIGEST.QUANTILE", key3,
"0.9", "0.1", "0.7", "0.3", "0.6", "0.0", "0.55", "0.65", "0.34", "0.88",
)
require.NoError(t, rsp.Err())
vals, err := rsp.Slice()
require.NoError(t, err)
require.Equal(t, 10, len(vals))

expected := []float64{
18.0,
-19.0,
14.0,
-3.0,
12.5,
-20.0,
12.0,
13.0,
-1.0,
18.0,
}
for i, v := range vals {
strVal, ok := v.(string)
require.True(t, ok, "Expected string at index %d but got %T", i, v)
numVal, err := strconv.ParseFloat(strVal, 64)
require.NoError(t, err, "Failed to parse value at index %d: %s", i, strVal)
require.InDelta(t, expected[i], numVal, 1e-6, "Mismatch at index %d", i)
}
}
})
}
Loading