-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Yield inside huge values migration serialization #4197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@adiholden re/ locks during serialization:
|
don't worry about |
I do refer to |
tests/dragonfly/cluster_test.py
Outdated
| if seed_during_migration: | ||
| await stop_seed() | ||
| else: | ||
| # Only verify memory growth if we haven't pushed new data during migration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we need this option of not running seeder during migration
you added this option inorder to check the rss compared to rss before migration right? but we can compare peak rss to peak used memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more importantly is the comparison - we compare before and after if we don't seed during the migration
Before migrating to Seeder and comparison, I used custom logic for checking the data that was inserted. With the seeder I'm afraid it is no longer possible to check, unless you have an idea on how to do that.
I personally vote for keeping the previous logic, let me know if you want that as well..
tests/dragonfly/cluster_test.py
Outdated
|
|
||
| insert_task = asyncio.create_task(insert_data(instances[0].cluster_client())) | ||
|
|
||
| async def get_rss(client, field): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_meomory_info maybe?
tests/dragonfly/cluster_test.py
Outdated
| while True: | ||
| rss = await get_rss(nodes[0].client, "used_memory_rss") | ||
| logging.debug(f"Current rss: {rss}") | ||
| if rss > 1_000_000_000: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain why are you waiting for 1G rss?
tests/dragonfly/cluster_test.py
Outdated
| # Insert data to containers with a gaussian distribution: some will be small and other big | ||
| stop = False | ||
|
|
||
| async def insert_data(client): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kostas is working on seeder improvement for containers with different size
I prefer to have something generic and this logic inside test
|
Please do not review yet. I still haven't modified the tests to use Kostas' new framework. This is just a merge of changes from |
src/server/journal/streamer.cc
Outdated
| if (fiber_cancelled_) | ||
| return; | ||
| cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) { | ||
| std::lock_guard guard(big_value_mu_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the big_value_mu_ should protect only the WriteBucket to make sure that we do not write to the serializer from different fibers once we yield
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idealy it would be inside the WriteBucket function but I think we might have some failures in the flow of CVCUponInsert which could be fixed but not in this PR for sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will also need to call db_slice_->BlockingCounter() , check out SliceSnapshot::BucketSaveCb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last comment makes me think that we are missing cluster migration test for dragonfly running in cache mode and for testing expire logic. Lets open a separate task for that and do this in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mimicked the locking pattern in snapshot.cc.
There, we acquire the lock before calling FlushChangeToEarlierCallbacks(), which I imagine is needed? If we'll lock inside WriteBucket() the call to FlushChangeToEarlierCallbacks() will be unguarded.
Also, by locking inside WriteBucket() we risk releasing the lock when we call it multiple times in CVCUponInsert() (so that the operation will no longer be atomic, if another fiber locks the mutex in between)
I added "locking" the blocking counter, good catch!
Will file an issue shortly. Edit: #4354
| await asyncio.gather( | ||
| *(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units) | ||
| ) | ||
| for unit in self.units: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this changes right? we are not using the seeder v2 in cluster now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true that we don't, but it took me forever to understand this is where the bug is, and we might use it in the future, so I think it's a good idea to keep it as is..
tests/dragonfly/cluster_test.py
Outdated
|
|
||
| @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) | ||
| @pytest.mark.asyncio | ||
| async def test_cluster_migration_huge_container_while_seeding( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the huge container in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I'll rename.
| pipe.execute_command(*cmd) | ||
| await pipe.execute() | ||
| else: | ||
| # To mirror consistently to Fake Redis we must only send to it successful |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not so good that we dont test multi flow but its better than nothing..
I will continue thinking if we have a better approach to testing correctness of migraion so we will not have this limitaion
tests/dragonfly/cluster_test.py
Outdated
| seeder = df_seeder_factory.create( | ||
| keys=100, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True | ||
| ) | ||
| seed = asyncio.create_task(seeder.run()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first call
await seeder.run(target_deviation=0.1)
To fill the data
and than
seed = asyncio.create_task(seeder.run())
you will not need the sleep in line 2054 after this change
tests/dragonfly/cluster_test.py
Outdated
|
|
||
| logging.debug("Seeding cluster") | ||
| seeder = df_seeder_factory.create( | ||
| keys=100, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more keys please
| df_factory.start_all([instance]) | ||
|
|
||
| seeder = df_seeder_factory.create( | ||
| keys=100, port=instance.port, unsupported_types=[ValueType.JSON], mirror_to_fake_redis=True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FakeRedis doesn't work with JSON in my setup, I don't know why..
Since JSON is disabled in cluster anyway, I thought that it wouldn't matter too much, see:
dragonfly/tests/dragonfly/utility.py
Line 430 in 3b082e4
| unsupported_types.append(ValueType.JSON) # Cluster aio client doesn't support JSON |
tests/dragonfly/utility.py
Outdated
| # To mirror consistently to Fake Redis we must only send to it successful | ||
| # commands. We can't use pipes because they might succeed partially. | ||
| for cmd in tx_data[0]: | ||
| await client.execute_command(*cmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can add here result compare between the different servers, as if the result is different this will probably lead to capture check fail and adding this check will help catching the reason in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great idea! I'll do that now.
| logging.debug("Seeding finished") | ||
|
|
||
| assert ( | ||
| await get_memory(client0, "used_memory_peak_rss") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the test on your branch , the check does not fail if chunk_size=0
maybe the data pushed by seeder is not big enough to trigger this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean that you ran this test on the main branch?
anyway, yeah, this could be possible. I added this check back when we had huge values in the test. I think it's worthwhile to keep it though, for if/when we will add huge values to the v1 seeder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran it on your branch.
So do we have any test that we can this check to make sure we actually reduce the memory usage with your changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was the original test that I wrote :)
I filled with custom (huge) data, verified consistency, and checked that memory did not increase too much.
We can't properly test that we did not increase in memory if we don't migrate huge values... but we can do that in another test, that does not migrate while seeding. I'll do that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this are 2 different checks that we dont increase rss to much and check the migration is working correctly while sending traffic when the migration is in progress.
It does not have to be in the same test
| ) | ||
| await seeder.run(target_deviation=0.1) | ||
|
|
||
| seed = asyncio.create_task(seeder.run()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when I run the test I also see that RestoreStreamer::OnDbChange and the journal calls are not executed.
Only after adding a sleep here after this line I started seeing them executed.
Please add this sleep and we should also add stats and print them to log and check the stats after running the test but lets do that in another PR
With #4144 we break huge values slot migration into multiple commands. This PR now adds yield between those commands.
It also adds a test that checks that modifying huge values while doing a migration works well, and that RSS doesn't grow too much.
Fixes #4100