Skip to content

Conversation

@chakaz
Copy link
Contributor

@chakaz chakaz commented Nov 26, 2024

With #4144 we break huge values slot migration into multiple commands. This PR now adds yield between those commands.
It also adds a test that checks that modifying huge values while doing a migration works well, and that RSS doesn't grow too much.

Fixes #4100

With #4144 we break huge values slot migration into multiple commands.
This PR now adds yield between those commands.
It also adds a test that checks that modifying huge values while doing a
migration works well, and that RSS doesn't grow too much.

Fixes #4100
@chakaz
Copy link
Contributor Author

chakaz commented Nov 26, 2024

@adiholden re/ locks during serialization:

DbSlice::PreUpdate() locks local_mu_ before calling OnDbChange, so we are already covered there implicitly. In the previous PR perhaps you were thinking about registering for journal changes? We don't do that here...

@kostasrim
Copy link
Contributor

@adiholden re/ locks during serialization:

DbSlice::PreUpdate() locks local_mu_ before calling OnDbChange, so we are already covered there implicitly. In the previous PR perhaps you were thinking about registering for journal changes? We don't do that here...

don't worry about local_mutex it's removed in my PR (assuming by local_mutex you mean the db_slice member)

@chakaz
Copy link
Contributor Author

chakaz commented Nov 26, 2024

@adiholden re/ locks during serialization:
DbSlice::PreUpdate() locks local_mu_ before calling OnDbChange, so we are already covered there implicitly. In the previous PR perhaps you were thinking about registering for journal changes? We don't do that here...

don't worry about local_mutex it's removed in my PR (assuming by local_mutex you mean the db_slice member)

I do refer to DbSlice's local_mu_, and I actually do need to lock it for the same reason PreUpdate locks it today.

@chakaz chakaz requested a review from adiholden November 28, 2024 10:50
if seed_during_migration:
await stop_seed()
else:
# Only verify memory growth if we haven't pushed new data during migration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we need this option of not running seeder during migration
you added this option inorder to check the rss compared to rss before migration right? but we can compare peak rss to peak used memory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more importantly is the comparison - we compare before and after if we don't seed during the migration

Before migrating to Seeder and comparison, I used custom logic for checking the data that was inserted. With the seeder I'm afraid it is no longer possible to check, unless you have an idea on how to do that.

I personally vote for keeping the previous logic, let me know if you want that as well..


insert_task = asyncio.create_task(insert_data(instances[0].cluster_client()))

async def get_rss(client, field):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_meomory_info maybe?

while True:
rss = await get_rss(nodes[0].client, "used_memory_rss")
logging.debug(f"Current rss: {rss}")
if rss > 1_000_000_000:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why are you waiting for 1G rss?

# Insert data to containers with a gaussian distribution: some will be small and other big
stop = False

async def insert_data(client):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kostas is working on seeder improvement for containers with different size
I prefer to have something generic and this logic inside test

@chakaz
Copy link
Contributor Author

chakaz commented Dec 8, 2024

Please do not review yet. I still haven't modified the tests to use Kostas' new framework. This is just a merge of changes from main.

@chakaz chakaz requested a review from adiholden December 22, 2024 13:36
if (fiber_cancelled_)
return;
cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) {
std::lock_guard guard(big_value_mu_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the big_value_mu_ should protect only the WriteBucket to make sure that we do not write to the serializer from different fibers once we yield

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idealy it would be inside the WriteBucket function but I think we might have some failures in the flow of CVCUponInsert which could be fixed but not in this PR for sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you will also need to call db_slice_->BlockingCounter() , check out SliceSnapshot::BucketSaveCb

Copy link
Contributor

@adiholden adiholden Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This last comment makes me think that we are missing cluster migration test for dragonfly running in cache mode and for testing expire logic. Lets open a separate task for that and do this in a separate PR

Copy link
Contributor Author

@chakaz chakaz Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mimicked the locking pattern in snapshot.cc.

There, we acquire the lock before calling FlushChangeToEarlierCallbacks(), which I imagine is needed? If we'll lock inside WriteBucket() the call to FlushChangeToEarlierCallbacks() will be unguarded.

Also, by locking inside WriteBucket() we risk releasing the lock when we call it multiple times in CVCUponInsert() (so that the operation will no longer be atomic, if another fiber locks the mutex in between)

I added "locking" the blocking counter, good catch!

Will file an issue shortly. Edit: #4354

@chakaz chakaz requested a review from adiholden December 26, 2024 11:39
await asyncio.gather(
*(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units)
)
for unit in self.units:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this changes right? we are not using the seeder v2 in cluster now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that we don't, but it took me forever to understand this is where the bug is, and we might use it in the future, so I think it's a good idea to keep it as is..


@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.asyncio
async def test_cluster_migration_huge_container_while_seeding(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the huge container in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I'll rename.

pipe.execute_command(*cmd)
await pipe.execute()
else:
# To mirror consistently to Fake Redis we must only send to it successful
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not so good that we dont test multi flow but its better than nothing..
I will continue thinking if we have a better approach to testing correctness of migraion so we will not have this limitaion

seeder = df_seeder_factory.create(
keys=100, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True
)
seed = asyncio.create_task(seeder.run())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first call
await seeder.run(target_deviation=0.1)
To fill the data
and than
seed = asyncio.create_task(seeder.run())

you will not need the sleep in line 2054 after this change


logging.debug("Seeding cluster")
seeder = df_seeder_factory.create(
keys=100, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more keys please

df_factory.start_all([instance])

seeder = df_seeder_factory.create(
keys=100, port=instance.port, unsupported_types=[ValueType.JSON], mirror_to_fake_redis=True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not json?

Copy link
Contributor Author

@chakaz chakaz Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FakeRedis doesn't work with JSON in my setup, I don't know why..
Since JSON is disabled in cluster anyway, I thought that it wouldn't matter too much, see:

unsupported_types.append(ValueType.JSON) # Cluster aio client doesn't support JSON

@chakaz chakaz requested a review from adiholden January 2, 2025 08:19
# To mirror consistently to Fake Redis we must only send to it successful
# commands. We can't use pipes because they might succeed partially.
for cmd in tx_data[0]:
await client.execute_command(*cmd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add here result compare between the different servers, as if the result is different this will probably lead to capture check fail and adding this check will help catching the reason in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea! I'll do that now.

logging.debug("Seeding finished")

assert (
await get_memory(client0, "used_memory_peak_rss")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test on your branch , the check does not fail if chunk_size=0
maybe the data pushed by seeder is not big enough to trigger this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean that you ran this test on the main branch?

anyway, yeah, this could be possible. I added this check back when we had huge values in the test. I think it's worthwhile to keep it though, for if/when we will add huge values to the v1 seeder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran it on your branch.
So do we have any test that we can this check to make sure we actually reduce the memory usage with your changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the original test that I wrote :)
I filled with custom (huge) data, verified consistency, and checked that memory did not increase too much.
We can't properly test that we did not increase in memory if we don't migrate huge values... but we can do that in another test, that does not migrate while seeding. I'll do that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this are 2 different checks that we dont increase rss to much and check the migration is working correctly while sending traffic when the migration is in progress.
It does not have to be in the same test

@chakaz chakaz requested a review from adiholden January 2, 2025 13:20
)
await seeder.run(target_deviation=0.1)

seed = asyncio.create_task(seeder.run())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I run the test I also see that RestoreStreamer::OnDbChange and the journal calls are not executed.
Only after adding a sleep here after this line I started seeing them executed.
Please add this sleep and we should also add stats and print them to log and check the stats after running the test but lets do that in another PR

@chakaz chakaz requested a review from adiholden January 5, 2025 13:32
@chakaz chakaz enabled auto-merge (squash) January 5, 2025 13:38
@chakaz chakaz merged commit 7860a16 into main Jan 5, 2025
9 checks passed
@chakaz chakaz deleted the chakaz/huge-values-migration-3 branch January 5, 2025 14:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Big value serialization in migration

4 participants