Skip to content

Commit ccc8442

Browse files
committed
Introduce HASH items expiration
Closes valkey-io#640 This PR introduces support for **field-level expiration in Valkey hash types**, making it possible for individual fields inside a hash to expire independently — creating what we call **volatile fields**. This is just the first out of 3 PRs. The content of this PR focus on enabling the basic ability to set and modify hash fields expiration as well as persistency (AOF+RDB) and defrag. [The second PR](#5) introduces the new algorithm (volatile-set) to track volatile hash fields is in the last stages of review. The current implementation in this PR (in volatile-set.h/c) is just s tub implementation and will be replaced by [The second PR](#5) [The third PR](#4) which introduces the active expiration and defragmentation jobs. For more highlevel design details you can track the RFC PR: valkey-io/valkey-rfc#22. --- Some highlevel major decisions which are taken as part of this work: 1. We decided to copy the existing Redis API in order to maintain compatibility with existing clients. 2. We decided to avoid introducing lazy-expiration at this point, in order to reduce complexity and rely only on active-expiration for memory reclamation. This will require us to continue to work on improving the active expiration job and potentially consider introduce lazy-expiration support later on. 3. Although different commands which are adding expiration on hash fields are influencing the memory utilization (by allocating more memory for expiration time and metadata) we decided to avoid adding the DENYOOM for these commands (an exception is HSETEX) in order to be better aligned with highlevel keys commands like `expire` 4. Some hash type commands will produce unexpected results: - HLEN - will still reflect the number of fields which exists in the hash object (either actually expired or not). - HRANDFIELD - in some cases we will not be able to randomly select a field which was not already expired. this case happen in 2 cases: 1/ when we are asked to provide a non-uniq fields (i.e negative count) 2/ when the size of the hash is much bigger than the count and we need to provide uniq results. In both cases it is possible that an empty response will be returned to the caller, even in case there are fields in the hash which are either persistent or not expired. 5. For the case were a field is provided with a zero (0) expiration time or expiration time in the past, it is immediately deleted. We decided that, in order to be aligned with how high level keys are handled, we will emit hexpired keyspace event for that case (instead of hdel). For example: for the case: 6. We will ALWAYS load hash fields during rdb load. This means that when primary is rebooting with an old snapshot, it will take time to reclaim all the expired fields. However this simplifies the current logic and avoid major refactoring that I suspect will be needed. ``` HSET myhash f1 v1 > 0 HGETEX myhash EX 0 FIELDS 1 f1 > "v1" HTTL myhash FIELDS 1 f1 > -2 ``` The reported events are: ``` 1) "psubscribe" 2) "__keyevent@0__*" 3) (integer) 1 1) "pmessage" 2) "__keyevent@0__*" 3) "__keyevent@0__:hset" 4) "myhash" 1) "pmessage" 2) "__keyevent@0__*" 3) "__keyevent@0__:hexpired" <---------------- note this 4) "myhash" 1) "pmessage" 2) "__keyevent@0__*" 3) "__keyevent@0__:del" 4) "myhash" ``` --- This PR also **modularizes and exposes the internal `hashTypeEntry` logic** as a new standalone `entry.c/h` module. This new abstraction handles all aspects of **field–value–expiry encoding** using multiple memory layouts optimized for performance and memory efficiency. An `entry` is an abstraction that represents a single **field–value pair with optional expiration**. Internally, Valkey uses different memory layouts for compactness and efficiency, chosen dynamically based on size and encoding constraints. The entry pointer is the field sds. Which make us use an entry just like any sds. We encode the entry layout type in the field SDS header. Field type SDS_TYPE_5 doesn't have any spare bits to encode this so we use it only for the first layout type. Entry with embedded value, used for small sizes. The value is stored as SDS_TYPE_8. The field can use any SDS type. Entry can also have expiration timestamp, which is the UNIX timestamp for it to be expired. For aligned fast access, we keep the expiry timestamp prior to the start of the sds header. +----------------+--------------+---------------+ | Expiration | field | value | | 1234567890LL | hdr "foo" \0 | hdr8 "bar" \0 | +-----------------------^-------+---------------+ | | entry pointer (points to field sds content) Entry with value pointer, used for larger fields and values. The field is SDS type 8 or higher. +--------------+-------+--------------+ | Expiration | value | field | | 1234567890LL | ptr | hdr "foo" \0 | +--------------+--^----+------^-------+ | | | | | entry pointer (points to field sds content) | value pointer = value sds The `entry.c/h` API provides methods to: - Create, read, and write and Update field/value/expiration - Set or clear expiration - Check expiration state - Clone or delete an entry --- This PR introduces **new commands** and extends existing ones to support field expiration: The proposed API is very much identical to the Redis provided API (Redis 7.4 + 8.0). This is intentionally proposed in order to avoid breaking client applications already opted to use hash items TTL. **Synopsis** ``` HSETEX key [NX | XX] [FNX | FXX] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] FIELDS numfields field value [field value ...] ``` Set the value of one or more fields of a given hash key, and optionally set their expiration time or time-to-live (TTL). The HSETEX command supports the following set of options: * `NX` — Only set the fields if the hash object does NOT exist. * `XX` — Only set the fields if if the hash object doesx exist. * `FNX` — Only set the fields if none of them already exist. * `FXX` — Only set the fields if all of them already exist. * `EX seconds` — Set the specified expiration time in seconds. * `PX milliseconds` — Set the specified expiration time in milliseconds. * `EXAT unix-time-seconds` — Set the specified Unix time in seconds at which the fields will expire. * `PXAT unix-time-milliseconds` — Set the specified Unix time in milliseconds at which the fields will expire. * `KEEPTTL` — Retain the TTL associated with the fields. The `EX`, `PX`, `EXAT`, `PXAT`, and `KEEPTTL` options are mutually exclusive. **Synopsis** ``` HGETEX key [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | PERSIST] FIELDS numfields field [field ...] ``` Get the value of one or more fields of a given hash key and optionally set their expiration time or time-to-live (TTL). The `HGETEX` command supports a set of options: * `EX seconds` — Set the specified expiration time, in seconds. * `PX milliseconds` — Set the specified expiration time, in milliseconds. * `EXAT unix-time-seconds` — Set the specified Unix time at which the fields will expire, in seconds. * `PXAT unix-time-milliseconds` — Set the specified Unix time at which the fields will expire, in milliseconds. * `PERSIST` — Remove the TTL associated with the fields. The `EX`, `PX`, `EXAT`, `PXAT`, and `PERSIST` options are mutually exclusive. **Synopsis** ``` HEXPIRE key seconds [NX | XX | GT | LT] FIELDS numfields field [field ...] ``` Set an expiration (TTL or time to live) on one or more fields of a given hash key. You must specify at least one field. Field(s) will automatically be deleted from the hash key when their TTLs expire. Field expirations will only be cleared by commands that delete or overwrite the contents of the hash fields, including `HDEL` and `HSET` commands. This means that all the operations that conceptually *alter* the value stored at a hash key's field without replacing it with a new one will leave the TTL untouched. You can clear the TTL of a specific field by specifying 0 for the ‘seconds’ argument. Note that calling `HEXPIRE`/`HPEXPIRE` with a time in the past will result in the hash field being deleted immediately. The `HEXPIRE` command supports a set of options: * `NX` — For each specified field, set expiration only when the field has no expiration. * `XX` — For each specified field, set expiration only when the field has an existing expiration. * `GT` — For each specified field, set expiration only when the new expiration is greater than current one. * `LT` — For each specified field, set expiration only when the new expiration is less than current one. **Synopsis** ``` HEXPIREAT key unix-time-seconds [NX | XX | GT | LT] FIELDS numfields field [field ...] ``` `HEXPIREAT` has the same effect and semantics as `HEXPIRE`, but instead of specifying the number of seconds for the TTL (time to live), it takes an absolute Unix timestamp in seconds since Unix epoch. A timestamp in the past will delete the field immediately. The `HEXPIREAT` command supports a set of options: * `NX` — For each specified field, set expiration only when the field has no expiration. * `XX` — For each specified field, set expiration only when the field has an existing expiration. * `GT` — For each specified field, set expiration only when the new expiration is greater than current one. * `LT` — For each specified field, set expiration only when the new expiration is less than current one. **Synopsis** ``` HPEXPIRE key milliseconds [NX | XX | GT | LT] FIELDS numfields field [field ...] ``` This command works like `HEXPIRE`, but the expiration of a field is specified in milliseconds instead of seconds. The `HPEXPIRE` command supports a set of options: * `NX` — For each specified field, set expiration only when the field has no expiration. * `XX` — For each specified field, set expiration only when the field has an existing expiration. * `GT` — For each specified field, set expiration only when the new expiration is greater than current one. * `LT` — For each specified field, set expiration only when the new expiration is less than current one. **Synopsis** ``` HPEXPIREAT key unix-time-milliseconds [NX | XX | GT | LT] FIELDS numfields field [field ...] ``` `HPEXPIREAT` has the same effect and semantics as `HEXPIREAT``,` but the Unix time at which the field will expire is specified in milliseconds since Unix epoch instead of seconds. **Synopsis** ``` HPERSIST key FIELDS numfields field [field ...] ``` Remove the existing expiration on a hash key's field(s), turning the field(s) from *volatile* (a field with expiration set) to *persistent* (a field that will never expire as no TTL (time to live) is associated). **Synopsis** ``` HSETEX key [NX] seconds field value [field value ...] ``` Similar to `HSET` but adds one or more hash fields that expire after specified number of seconds. By default, this command overwrites the values and expirations of specified fields that exist in the hash. If `NX` option is specified, the field data will not be overwritten. If `key` doesn't exist, a new Hash key is created. The HSETEX command supports a set of options: * `NX` — For each specified field, set expiration only when the field has no expiration. **Synopsis** ``` HTTL key FIELDS numfields field [field ...] ``` Returns the **remaining** TTL (time to live) of a hash key's field(s) that have a set expiration. This introspection capability allows you to check how many seconds a given hash field will continue to be part of the hash key. ``` HPTTL key FIELDS numfields field [field ...] ``` Like `HTTL`, this command returns the remaining TTL (time to live) of a field that has an expiration set, but in milliseconds instead of seconds. **Synopsis** ``` HEXPIRETIME key FIELDS numfields field [field ...] ``` Returns the absolute Unix timestamp in seconds since Unix epoch at which the given key's field(s) will expire. **Synopsis** ``` HPEXPIRETIME key FIELDS numfields field [field ...] ``` `HPEXPIRETIME` has the same semantics as `HEXPIRETIME`, but returns the absolute Unix expiration timestamp in milliseconds since Unix epoch instead of seconds. This PR introduces new notification events to support field-level expiration: | Event | Trigger | |-------------|-------------------------------------------| | `hexpire` | Field expiration was set | | `hexpired` | Field was deleted due to expiration | | `hpersist` | Expiration was removed from a field | | `del` | Key was deleted after all fields expired | Note that we diverge from Redis in the cases we emit hexpired event. For example: given the following usecase: ``` HSET myhash f1 v1 (integer) 0 HGETEX myhash EX 0 FIELDS 1 f1 1) "v1" HTTL myhash FIELDS 1 f1 1) (integer) -2 ``` regarding the keyspace-notifications: Redis reports: ``` 1) "psubscribe" 2) "__keyevent@0__:*" 3) (integer) 1 1) "pmessage" 2) "__keyevent@0__:*" 3) "__keyevent@0__:hset" 4) "myhash2" 1) "pmessage" 2) "__keyevent@0__:*" 3) "__keyevent@0__:hdel" <---------------- note this 4) "myhash2" 1) "pmessage" 2) "__keyevent@0__:*" 3) "__keyevent@0__:del" 4) "myhash2" ``` However In our current suggestion, Valkey will emit: ``` 1) "psubscribe" 2) "__keyevent@0__*" 3) (integer) 1 1) "pmessage" 2) "__keyevent@0__*" 3) "__keyevent@0__:hset" 4) "myhash" 1) "pmessage" 2) "__keyevent@0__*" 3) "__keyevent@0__:hexpired" <---------------- note this 4) "myhash" 1) "pmessage" 2) "__keyevent@0__*" 3) "__keyevent@0__:del" 4) "myhash" ``` --- - Expiration-aware commands (`HSETEX`, `HGETEX`, etc.) are **not propagated as-is**. - Instead, Valkey rewrites them into equivalent commands like: - `HDEL` (for expired fields) - `HPEXPIREAT` (for setting absolute expiration) - `HPERSIST` (for removing expiration) This ensures compatibility with replication and AOF while maintaining consistent field-level expiry behavior. --- | Command Name | QPS Standard | QPS HFE | QPS Diff % | Latency Standard (ms) | Latency HFE (ms) | Latency Diff % | |--------------|-------------|---------|------------|----------------------|------------------|----------------| | **One Large Hash Table** | | HGET | 137988.12 | 138484.97 | +0.36% | 0.951 | 0.949 | -0.21% | | HSET | 138561.73 | 137343.77 | -0.87% | 0.948 | 0.956 | +0.84% | | HEXISTS | 139431.12 | 138677.02 | -0.54% | 0.942 | 0.946 | +0.42% | | HDEL | 140114.89 | 138966.09 | -0.81% | 0.938 | 0.945 | +0.74% | | **Many Hash Tables (100 fields)** | | HGET | 136798.91 | 137419.27 | +0.45% | 0.959 | 0.956 | -0.31% | | HEXISTS | 138946.78 | 139645.31 | +0.50% | 0.946 | 0.941 | -0.52% | | HGETALL | 42194.09 | 42016.80 | -0.42% | 0.621 | 0.625 | +0.64% | | HSET | 137230.69 | 137249.53 | +0.01% | 0.959 | 0.958 | -0.10% | | HDEL | 138985.41 | 138619.34 | -0.26% | 0.948 | 0.949 | +0.10% | | **Many Hash Tables (1000 fields)** | | HGET | 135795.77 | 139256.36 | +2.54% | 0.965 | 0.943 | -2.27% | | HEXISTS | 138121.55 | 137950.06 | -0.12% | 0.951 | 0.952 | +0.10% | | HGETALL | 5885.81 | 5633.80 | **-4.28%** | 2.690 | 2.841 | **+5.61%** | | HSET | 137005.08 | 137400.39 | +0.28% | 0.959 | 0.955 | -0.41% | | HDEL | 138293.45 | 137381.52 | -0.65% | 0.948 | 0.955 | +0.73% | [ ] Consider extending HSETEX with extra arguments: NX/XX so that it is possible to prevent adding/setting/mutating fields of a non-existent hash [ ] Avoid loading expired fields when non-preamble RDB is being loaded on primary. This is an optimization in order to reduce loading unnecessary fields (which are expired). This would also require us to propagate the HDEL to the replicas in case of RDBFLAGS_FEED_REPL. Note that it might have to require some refactoring: 1/ propagate the rdbflags and current time to rdbLoadObject. 2/ consider the case of restore and check_rdb etc... For this reason I would like to avoid this optimizationfor the first drop.
1 parent 3b12132 commit ccc8442

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+6943
-673
lines changed

cmake/Modules/SourceFiles.cmake

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ set(VALKEY_SERVER_SRCS
117117
${CMAKE_SOURCE_DIR}/src/connection.c
118118
${CMAKE_SOURCE_DIR}/src/unix.c
119119
${CMAKE_SOURCE_DIR}/src/server.c
120-
${CMAKE_SOURCE_DIR}/src/logreqres.c)
120+
${CMAKE_SOURCE_DIR}/src/logreqres.c
121+
${CMAKE_SOURCE_DIR}/src/entry.c
122+
${CMAKE_SOURCE_DIR}/src/volatile_set.c)
123+
121124

122125
# valkey-cli
123126
set(VALKEY_CLI_SRCS

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ ENGINE_NAME=valkey
423423
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
424424
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
425425
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o
426-
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
426+
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o volatile_set.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
427427
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
428428
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
429429
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o

src/anet.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@
5252
#include "util.h"
5353
#include "serverassert.h"
5454

55-
#define UNUSED(x) (void)(x)
56-
5755
static void anetSetError(char *err, const char *fmt, ...) {
5856
va_list ap;
5957

src/aof.c

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,12 +1955,32 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
19551955
* The function returns 0 on error, 1 on success. */
19561956
int rewriteHashObject(rio *r, robj *key, robj *o) {
19571957
hashTypeIterator hi;
1958-
long long count = 0, items = hashTypeLength(o);
1959-
1958+
long long count = 0, volatile_items = 0, non_volatile_items;
1959+
/* First serialize volatile items if exist */
1960+
if (hashTypeHasVolatileElements(o)) {
1961+
hashTypeInitVolatileIterator(o, &hi);
1962+
while (hashTypeNext(&hi) != C_ERR) {
1963+
long long expiry = entryGetExpiry(hi.next);
1964+
sds field = entryGetField(hi.next);
1965+
sds value = entryGetValue(hi.next);
1966+
if (rioWriteBulkCount(r, '*', 8) == 0) return 0;
1967+
if (rioWriteBulkString(r, "HSETEX", 6) == 0) return 0;
1968+
if (rioWriteBulkObject(r, key) == 0) return 0;
1969+
if (rioWriteBulkString(r, "PXAT", 4) == 0) return 0;
1970+
if (rioWriteBulkLongLong(r, expiry) == 0) return 0;
1971+
if (rioWriteBulkString(r, "FIELDS", 6) == 0) return 0;
1972+
if (rioWriteBulkLongLong(r, 1) == 0) return 0;
1973+
if (rioWriteBulkString(r, field, sdslen(field)) == 0) return 0;
1974+
if (rioWriteBulkString(r, value, sdslen(value)) == 0) return 0;
1975+
volatile_items++;
1976+
}
1977+
hashTypeResetIterator(&hi);
1978+
}
1979+
non_volatile_items = hashTypeLength(o) - volatile_items;
19601980
hashTypeInitIterator(o, &hi);
19611981
while (hashTypeNext(&hi) != C_ERR) {
19621982
if (count == 0) {
1963-
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : items;
1983+
int cmd_items = (non_volatile_items > AOF_REWRITE_ITEMS_PER_CMD) ? AOF_REWRITE_ITEMS_PER_CMD : non_volatile_items;
19641984

19651985
if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || !rioWriteBulkString(r, "HMSET", 5) ||
19661986
!rioWriteBulkObject(r, key)) {
@@ -1969,16 +1989,18 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
19691989
}
19701990
}
19711991

1992+
if (volatile_items > 0 && entryHasExpiry(hi.next))
1993+
continue;
1994+
19721995
if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_FIELD) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
19731996
hashTypeResetIterator(&hi);
19741997
return 0;
19751998
}
19761999
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1977-
items--;
2000+
non_volatile_items--;
19782001
}
19792002

19802003
hashTypeResetIterator(&hi);
1981-
19822004
return 1;
19832005
}
19842006

0 commit comments

Comments
 (0)