Skip to content

Comments

propagate changes for hsetex with PXAT absolute timestamps#8

Merged
ranshid merged 14 commits intoranshid:fix-propagate-hdel-when-override-expired-elementfrom
frostzt:fix-propagate-hsetex-when-expired
Jan 15, 2026
Merged

propagate changes for hsetex with PXAT absolute timestamps#8
ranshid merged 14 commits intoranshid:fix-propagate-hdel-when-override-expired-elementfrom
frostzt:fix-propagate-hsetex-when-expired

Conversation

@frostzt
Copy link

@frostzt frostzt commented Jan 14, 2026

Created this so that you can actively review this @ranshid

@ranshid
Copy link
Owner

ranshid commented Jan 14, 2026

@frostzt PTAL. I changed the implementation foir hincr/hset/hsetnx to not propagate hdel. Still in our case ONLY when hsetex has the KEEPTTL flag, it will have to propagate. Also my changes around hsetex can be used to help your case

@frostzt
Copy link
Author

frostzt commented Jan 14, 2026

@ranshid can you check once? I am now doing it in 3 stages:

  1. If the field does not exist we use HDEL
  2. If the field has KEEPTTL and has an expiry I send the command with PXAT
  3. For field without expiry its just HSET

@frostzt frostzt requested a review from ranshid January 14, 2026 14:26
@frostzt frostzt changed the title simple draft for propagate changes for hsetex deletions propagate changes for hsetex with PXAT absolute timestamps Jan 14, 2026
@ranshid
Copy link
Owner

ranshid commented Jan 14, 2026

@frostzt this is a big change you placed there :) I am not even sure why you need to go that far.
The way I see it is this:

  1. We only need to collect the fields pointers (just the field name from the argument - c->argv[i]) into an array which could have been pre-allocated (you can also place it on the stack, but risk a stack overflow) according to the command number of fields (not fields+values)
  2. What we need to do now s that in the place where I already placed the "hexpired" report:
else {
            if (expired_overriten) {
                server.stat_expiredfields += expired_overriten;
                notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id);
            }

we need to run a loop calling propagateFieldsDeletion on the collected items and loop to the next collected items based on the value it returned. That is it :)

@frostzt
Copy link
Author

frostzt commented Jan 14, 2026

@frostzt this is a big change you placed there :) I am not even sure why you need to go that far. The way I see it is this:

1. We only need to collect the fields pointers (just the field name from the argument - c->argv[i]) into an array which could have been pre-allocated (you can also place it on the stack, but risk a stack overflow) according to the command number of fields (not fields+values)

2. What we need to do now s that in the place where I already placed the "hexpired" report:
else {
            if (expired_overriten) {
                server.stat_expiredfields += expired_overriten;
                notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id);
            }

we need to run a loop calling propagateFieldsDeletion on the collected items and loop to the next collected items based on the value it returned. That is it :)

Ah forgive me for that I kinda thought of it in a sequential way lemme update this, and thanks a lot for bearing with me 😭

Copy link
Owner

@ranshid ranshid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropped some comments

@frostzt frostzt requested a review from ranshid January 14, 2026 18:42
src/t_hash.c Outdated
} else {
if (expired_overriten) {
/* Propagate deletions for expired/non-existent fields in batches */
if (keepttl_count > 0) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so keepttl_count and expired_overriten are always identical (if we decide to increment the expired_overriten whenever expired is set by hashTypeSet). I think we need not use 2 different values and we can simply use expired_overriten. I think the propagation logic should be applied here, but only in case keepttl_fields is not NULL.
The reason is that when KEEPTTL is provided the flag is set for ALL fields.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@frostzt frostzt requested a review from ranshid January 14, 2026 20:44
@ranshid
Copy link
Owner

ranshid commented Jan 14, 2026

@frostzt mostly LGTM. can you please add a test in hashexopire.tcl (you can do something in the inspiration of valkey-io#3036 and place it in the same place I placed all the other tests in valkey-io#3060

src/t_hash.c Outdated
Comment on lines 1479 to 1489
if (expired) expired_overriten++;
changes++;
if (need_rewrite_argv) {

/* When KEEPTTL is used, we need to track all fields to propagate them
* individually with their actual timestamps */
if ((flags & ARGS_KEEPTTL) && expired) {
if (keepttl_fields == NULL) {
keepttl_fields = zmalloc(sizeof(robj *) * num_fields);
}
keepttl_fields[expired_overriten] = c->argv[i];
incrRefCount(c->argv[i]);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranshid I think I messed up here! Caught it while running tests locally I increased expired_overriten however we're using that as index which is causing the server to panic, I think a simple post increase should be good?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh it should be increased after we use it to index the position

@ranshid
Copy link
Owner

ranshid commented Jan 15, 2026

@frostzt 2 issues:

  1. in order to support the KEEPTTL we need to fix how hashTypeSet function manages expirations.
    This means that instead of doing:
is_expired = timestampIsExpired(entry_expiry);

we should do:

is_expired = entry_expiry != EXPIRY_NONE && checkAlreadyExpired(entry_expiry);

this is in order to make sure replicas/import mode and slot migration are operating correctly by forcing the logic instructed by the replication/import/migration stream
I will make this change in my PR since it will also require some of my tests to change.

  1. The test is not really testing what we wanted.
    We have 2 issues with KEEPTTL:
    a. the case of a primary HSETEX overwrites an expired field which was not yet reclaimed by the ActiveExpiration
    b. the case of primary HSETEX overwrites an NOT expired field BUT when the command reaches the replica it will use the old way (as explained in #(1) - timestampIsExpired) and thus will do something different than what the primary was doing.
    The propagation of the hdel which you are working on, is required to solve case 'a' AFTER we solve case 'b' :)
    I mean the fix in #(1) will introduce a problem in the case of 'a' which is why we will propagate hdel in order to make sure the replica will not KEPPTTL of a field which the primary ALOS expired by overwrting it.

I suggest 2 tests for each of the cases a/b:

 test {HSETEX KEEPTTL replica should preserve ttl when field is not expired on primary} {
            lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired
            $primary debug set-active-expire 0

            $primary hset myhash f1 v1

            wait_for_ofs_sync $primary $replica

            pause_process $replica_pid
            
            $primary multi
            $primary hpexpire myhash 1 fields 1 f1
            $primary hsetex myhash KEEPTTL fields 1 f1 v2
            $primary exec

            # wait for f1 to expired
            wait_for_condition 50 100 {
                [$primary httl myhash fields 1 f1] == -2
            } else {
                fail "Field was not logically expired on primary"
            }

            resume_process $replica_pid

            wait_for_ofs_sync $primary $replica

            assert_equal {-2} [$primary httl myhash fields 1 f1]
            assert_equal {-2} [$replica httl myhash fields 1 f1]
            $primary debug set-active-expire 1
        } {OK} {needs:debug}

        test {HSETEX KEEPTTL replica should NOT preserve ttl when field is expired on primary} {
            lassign [setup_replication_test $primary $replica $primary_host $primary_port] primary_initial_expired replica_initial_expired
            $primary debug set-active-expire 0

            # write a short lived field on the primary and wait for the propagation
            $primary hsetex myhash PX 1 fields 1 f1 v1
        
            # wait for f1 to expired
            wait_for_condition 50 100 {
                [$primary httl myhash fields 1 f1] == -2
            } else {
                fail "Field was not logically expired on primary"
            }

            # Now overite the expired field on the primary and wait for it to propagate to the replica
            $primary hsetex myhash KEEPTTL fields 1 f1 v2
            wait_for_ofs_sync $primary $replica

            assert_equal {v2} [$primary hget myhash f1]
            assert_equal {v2} [$replica hget myhash f1]
            $primary debug set-active-expire 1
        } {OK} {needs:debug}

@ranshid
Copy link
Owner

ranshid commented Jan 15, 2026

@frostzt for time constraints I made some of the changes I suggested in my PR. please merge it to yours and focus on the new hdel propagation.
Currently in my latest changes the test HSETEX KEEPTTL replica should NOT preserve ttl when field is expired on primary will fail. and your fix is the one that should make it work again

…e-expired-element' into fix-propagate-hsetex-when-expired
@frostzt
Copy link
Author

frostzt commented Jan 15, 2026

@frostzt for time constraints I made some of the changes I suggested in my PR. please merge it to yours and focus on the new hdel propagation. Currently in my latest changes the test HSETEX KEEPTTL replica should NOT preserve ttl when field is expired on primary will fail. and your fix is the one that should make it work again

Thanks @ranshid just took a pull!
Question: With the above by doing this

is_expired = entry_expiry != EXPIRY_NONE && checkAlreadyExpired(entry_expiry);

This essentially is for fields that don't define a TTL. My question now is for hdel propagations this would impact the above that I have written right? What should my logical flow for tackling this be?

src/t_hash.c Outdated
}
keepttl_fields[expired_overriten] = c->argv[i];
incrRefCount(c->argv[i]);
} else if (need_rewrite_argv) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also rewrite the argv when we have the FXX/FNX and NX/XX flags, so we cannot "else if" here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok this fixed the test, @ranshid if you don't mind (I'll push the changes) but can you explain this to me a little more?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more about what?

@ranshid
Copy link
Owner

ranshid commented Jan 15, 2026

@frostzt for time constraints I made some of the changes I suggested in my PR. please merge it to yours and focus on the new hdel propagation. Currently in my latest changes the test HSETEX KEEPTTL replica should NOT preserve ttl when field is expired on primary will fail. and your fix is the one that should make it work again

Thanks @ranshid just took a pull! Question: With the above by doing this

is_expired = entry_expiry != EXPIRY_NONE && checkAlreadyExpired(entry_expiry);

This essentially is for fields that don't define a TTL. My question now is for hdel propagations this would impact the above that I have written right? What should my logical flow for tackling this be?

Your currently implemented logic is fine. In hsetex when we got an indication that we overwritten an expired field and KEEPTTL is used (ONLY THEN) we should ALSO propagate hdel the command itself will be propagated right after

src/t_hash.c Outdated
} else {
if (expired_overriten) {
/* Propagate deletions for expired/non-existent fields in batches */
if (keepttl_fields != NULL) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will mask the next 2 lines which SHOULD be executed even if keepttl_fields is null. keepttl_fields is not NULL only when we overwritten expired fields AND KEEPTTL flag was provided. however even if KEEPTTL was not provided, we might have overwritten some fields which were found to be expired

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh snap yeah, that makes sense

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I just attached my thinking that overritten == overritten with KEEPTTL

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, so this is why I split the effort. the overwriting of expired keys is a general problem/bug I fixed in my PR. the specific case of KEEPTTL is another issue

@ranshid
Copy link
Owner

ranshid commented Jan 15, 2026

Thank you sooooo much @frostzt !

src/t_hash.c Outdated
Comment on lines 1523 to 1525
for (int i = 0; i < expired_overriten; i++) {
decrRefCount(keepttl_fields[i]);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still not sure why this is needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh checking the code for propagateFieldDeletion I get it my bad again I didn't knew that it does that already!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the ref count ownership is passed to this function.
I suggest always compile and test with ASAN:
make -j noopt SANITIZER=address OPTIMIZATION=-O0

@ranshid
Copy link
Owner

ranshid commented Jan 15, 2026

Great @frostzt I will now merge it and take it from here! much much Thank you!

@frostzt
Copy link
Author

frostzt commented Jan 15, 2026

Thank you sooooo much @frostzt !

I honestly don't know if I deserve that but all thanks to you I really wanted to work in Valkey and have been just mesmerizing its code I can assure you the effort you put in to help me means a lot honestly I learned so much I'll keep up and keep contributing into this and hopefully will be able to write much better PRs (that will be easier for you to review 😛 )

@ranshid ranshid merged commit 7faa9a2 into ranshid:fix-propagate-hdel-when-override-expired-element Jan 15, 2026
ranshid pushed a commit that referenced this pull request Feb 17, 2026
…lkey-io#3174)

I was working on ASAN large memory tests when I countered this issue.

The issue was that the hardcoded `999` key could land in an early
bucket. Then shrink rehash could finish early, and later inserts could
trigger a new expansion rehash, resetting rehash_idx low. The test now
picks the survivor key dynamically as the key mapped to the highest
bucket index.

```
[test_hashtable.c] Memory leak detected of 336 bytes
=================================================================
==3901==ERROR: LeakSanitizer: detected memory leaks

Direct leak of 80 byte(s) in 1 object(s) allocated from:
    #0 0x7fb0556fd9c7 in malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:69
    #1 0x563bfdf4c47d in ztrymalloc_usable_internal /home/runner/work/valkey/valkey/src/zmalloc.c:156
    #2 0x563bfdf4c47d in valkey_malloc /home/runner/work/valkey/valkey/src/zmalloc.c:185
    #3 0x563bfdd42eaf in hashtableCreate /home/runner/work/valkey/valkey/src/hashtable.c:1217
    #4 0x563bfdaa1cbf in test_empty_buckets_rehashing unit/test_hashtable.c:232
    #5 0x563bfdae772b in runTestSuite unit/test_main.c:36
    #6 0x563bfda86b20 in main unit/test_main.c:108
    #7 0x7fb05522a1c9  (/lib/x86_64-linux-gnu/libc.so.6+0x2a1c9) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    #8 0x7fb05522a28a in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2a28a) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    valkey-io#9 0x563bfda8a5c4 in _start (/home/runner/work/valkey/valkey/src/valkey-unit-tests+0x17c5c4) (BuildId: 44cfc183e6e82e499bcc9f6adc094d7f774ee9d2)

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7fb0556fd340 in calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:77
    #1 0x563bfdf4c922 in ztrycalloc_usable_internal /home/runner/work/valkey/valkey/src/zmalloc.c:214
    #2 0x563bfdf4c922 in valkey_calloc /home/runner/work/valkey/valkey/src/zmalloc.c:257
    #3 0x563bfdd40967 in resize /home/runner/work/valkey/valkey/src/hashtable.c:741
    #4 0x563bfdd45eb1 in hashtableExpandIfNeeded /home/runner/work/valkey/valkey/src/hashtable.c:1446
    #5 0x563bfdd45eb1 in hashtableExpandIfNeeded /home/runner/work/valkey/valkey/src/hashtable.c:1433
    #6 0x563bfdd45eb1 in insert /home/runner/work/valkey/valkey/src/hashtable.c:1041
    #7 0x563bfdd45eb1 in hashtableAddOrFind /home/runner/work/valkey/valkey/src/hashtable.c:1554
    #8 0x563bfdd45eb1 in hashtableAdd /home/runner/work/valkey/valkey/src/hashtable.c:1539
    valkey-io#9 0x563bfdaa1e3b in test_empty_buckets_rehashing unit/test_hashtable.c:254
    valkey-io#10 0x563bfdae772b in runTestSuite unit/test_main.c:36
    valkey-io#11 0x563bfda86b20 in main unit/test_main.c:108
    valkey-io#12 0x7fb05522a1c9  (/lib/x86_64-linux-gnu/libc.so.6+0x2a1c9) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    valkey-io#13 0x7fb05522a28a in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2a28a) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    valkey-io#14 0x563bfda8a5c4 in _start (/home/runner/work/valkey/valkey/src/valkey-unit-tests+0x17c5c4) (BuildId: 44cfc183e6e82e499bcc9f6adc094d7f774ee9d2)

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7fb0556fd340 in calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:77
    #1 0x563bfdf4c922 in ztrycalloc_usable_internal /home/runner/work/valkey/valkey/src/zmalloc.c:214
    #2 0x563bfdf4c922 in valkey_calloc /home/runner/work/valkey/valkey/src/zmalloc.c:257
    #3 0x563bfdd3f553 in bucketConvertToChained /home/runner/work/valkey/valkey/src/hashtable.c:908
    #4 0x563bfdd3f553 in findBucketForInsert /home/runner/work/valkey/valkey/src/hashtable.c:1021
    #5 0x563bfdd45d9e in insert /home/runner/work/valkey/valkey/src/hashtable.c:1045
    #6 0x563bfdd45d9e in hashtableAddOrFind /home/runner/work/valkey/valkey/src/hashtable.c:1554
    #7 0x563bfdd45d9e in hashtableAdd /home/runner/work/valkey/valkey/src/hashtable.c:1539
    #8 0x563bfdaa1e3b in test_empty_buckets_rehashing unit/test_hashtable.c:254
    valkey-io#9 0x563bfdae772b in runTestSuite unit/test_main.c:36
    valkey-io#10 0x563bfda86b20 in main unit/test_main.c:108
    valkey-io#11 0x7fb05522a1c9  (/lib/x86_64-linux-gnu/libc.so.6+0x2a1c9) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    valkey-io#12 0x7fb05522a28a in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2a28a) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    valkey-io#13 0x563bfda8a5c4 in _start (/home/runner/work/valkey/valkey/src/valkey-unit-tests+0x17c5c4) (BuildId: 44cfc183e6e82e499bcc9f6adc094d7f774ee9d2)

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7fb0556fd340 in calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:77
    #1 0x563bfdf4c922 in ztrycalloc_usable_internal /home/runner/work/valkey/valkey/src/zmalloc.c:214
    #2 0x563bfdf4c922 in valkey_calloc /home/runner/work/valkey/valkey/src/zmalloc.c:257
    #3 0x563bfdd40967 in resize /home/runner/work/valkey/valkey/src/hashtable.c:741
    #4 0x563bfdaa1df8 in test_empty_buckets_rehashing unit/test_hashtable.c:248
    #5 0x563bfdae772b in runTestSuite unit/test_main.c:36
    #6 0x563bfda86b20 in main unit/test_main.c:108
    #7 0x7fb05522a1c9  (/lib/x86_64-linux-gnu/libc.so.6+0x2a1c9) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    #8 0x7fb05522a28a in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2a28a) (BuildId: 274eec488d230825a136fa9c4d85370fed7a0a5e)
    valkey-io#9 0x563bfda8a5c4 in _start (/home/runner/work/valkey/valkey/src/valkey-unit-tests+0x17c5c4) (BuildId: 44cfc183e6e82e499bcc9f6adc094d7f774ee9d2)

SUMMARY: AddressSanitizer: 336 byte(s) leaked in 4 allocation(s).
```

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants