Skip to content
32 changes: 31 additions & 1 deletion src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,36 @@ class CommandTDigestMinMax : public Commander {
std::string key_name_;
bool is_min_;
};
class CommandTDigestReset : public Commander {
Status Parse(const std::vector<std::string> &args) override {
key_name_ = args[1];
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
TDigestMetadata metadata;
auto s = tdigest.GetMetaData(ctx, key_name_, &metadata);
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}
if (metadata.total_observations == 0) {
*output = redis::RESP_OK;
return Status::OK();
}
s = tdigest.Reset(ctx, key_name_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::RESP_OK;
return Status::OK();
}

private:
std::string key_name_;
};
// Then replace the existing template implementation and type aliases with:
class CommandTDigestMin : public CommandTDigestMinMax {
public:
Expand All @@ -218,5 +247,6 @@ REDIS_REGISTER_COMMANDS(TDigest, MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1));
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1));
} // namespace redis
38 changes: 38 additions & 0 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,45 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name

return rocksdb::Status::OK();
}
rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name) {
auto ns_key = AppendNamespacePrefix(digest_name);

TDigestMetadata metadata;
if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTDigest);
if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
return status;
}

metadata.unmerged_nodes = 0;
metadata.merged_nodes = 0;
metadata.total_weight = 0;
metadata.merged_weight = 0;
metadata.minimum = std::numeric_limits<double>::max();
metadata.maximum = std::numeric_limits<double>::lowest();
metadata.total_observations = 0;
metadata.merge_times = 0;

std::string metadata_bytes;
metadata.Encode(&metadata_bytes);

if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
return status;
}

auto start_key = internalSegmentGuardPrefixKey(metadata, ns_key, SegmentType::kBuffer);
auto guard_key = internalSegmentGuardPrefixKey(metadata, ns_key, SegmentType::kGuardFlag);

if (auto status = batch->DeleteRange(cf_handle_, start_key, guard_key); !status.ok()) {
return status;
}
auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
return status;
}
rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata) {
auto ns_key = AppendNamespacePrefix(digest_name);
return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
Expand Down
1 change: 1 addition & 0 deletions src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class TDigest : public SubKeyScanner {
rocksdb::Status Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result);

rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down
53 changes: 53 additions & 0 deletions tests/gocase/unit/type/tdigest/tdigest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,57 @@ func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
require.NoError(t, rsp.Err())
require.Equal(t, "-10.5", rsp.Val())
})
t.Run("tdigest.reset with different arguments", func(t *testing.T) {
keyPrefix := "tdigest_reset_"

// Testing with no arguments to .RESET
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.RESET").Err(), errMsgWrongNumberArg)

require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", keyPrefix+"mydigest", "compression", "101").Err())

key := keyPrefix + "mydigest"
// Adding some data to digest
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "-84.3", "199.3", "343.34", "12.34").Err())

// Checking MIN value to ensure data was added
rsp := rdb.Do(ctx, "TDIGEST.MIN", key)
require.NoError(t, rsp.Err())
require.EqualValues(t, rsp.Val(), "-84.3")

// Reset on a non-existent key
require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.RESET", keyPrefix+"notexist").Err(), errMsgKeyNotExist)

// Get TDIGEST.INFO before reset
rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
require.NoError(t, rsp.Err())
infoBeforeReset := toTdigestInfo(t, rsp.Val())

// Perform the reset
require.NoError(t, rdb.Do(ctx, "TDIGEST.RESET", key).Err())

// Get TDIGEST.INFO after reset
rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
require.NoError(t, rsp.Err())
infoAfterReset := toTdigestInfo(t, rsp.Val())

// Ensure capacity remains unchanged
require.EqualValues(t, infoBeforeReset.Capacity, infoAfterReset.Capacity)
require.EqualValues(t, 101, infoAfterReset.Compression)
require.EqualValues(t, 0, infoAfterReset.MergedNodes)
require.EqualValues(t, 0, infoAfterReset.UnmergedNodes)
require.EqualValues(t, 0, infoAfterReset.Observations)
require.EqualValues(t, 0, infoAfterReset.TotalCompressions)

// Reset on an empty digest
emptyDigestKey := keyPrefix + "empty"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", emptyDigestKey, "COMPRESSION", "100").Err())
rsp = rdb.Do(ctx, "TDIGEST.RESET", emptyDigestKey)
require.NoError(t, rsp.Err())

// Ensure empty digest's capacity remains the same
rsp = rdb.Do(ctx, "TDIGEST.INFO", emptyDigestKey)
require.NoError(t, rsp.Err())
infoAfterEmptyReset := toTdigestInfo(t, rsp.Val())
require.EqualValues(t, 100, infoAfterEmptyReset.Compression)
})
}
Loading