-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Yield inside huge values migration serialization #4197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66307a1
e79309e
4e0763a
496c4a7
0aaaabc
be98628
1e5d628
9ad4ea1
30c58fd
b323464
efc25dc
f33dfb1
920d2cf
f7265d5
fbd3550
ecf70c7
2193977
785bcc5
9b3940f
add8ebb
e2d795d
5493cbf
0b07108
2ccc30f
c22e985
d57440f
71acec6
7ea8adc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,8 +14,7 @@ | |
| from redis.cluster import RedisCluster | ||
| from redis.cluster import ClusterNode | ||
| from .proxy import Proxy | ||
| from .seeder import SeederBase | ||
| from .seeder import StaticSeeder | ||
| from .seeder import Seeder, SeederBase, StaticSeeder | ||
|
|
||
| from . import dfly_args | ||
|
|
||
|
|
@@ -33,6 +32,11 @@ def monotonically_increasing_port_number(): | |
| next_port = monotonically_increasing_port_number() | ||
|
|
||
|
|
||
| async def get_memory(client, field): | ||
| info = await client.info("memory") | ||
| return info[field] | ||
|
|
||
|
|
||
| class RedisClusterNode: | ||
| def __init__(self, port): | ||
| self.port = port | ||
|
|
@@ -1981,6 +1985,7 @@ async def node1size0(): | |
|
|
||
| @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) | ||
| @pytest.mark.asyncio | ||
| @pytest.mark.opt_only | ||
| async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): | ||
| instances = [ | ||
| df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) | ||
|
|
@@ -1995,7 +2000,7 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) | |
|
|
||
| logging.debug("Generating huge containers") | ||
| seeder = StaticSeeder( | ||
| key_target=10, | ||
| key_target=100, | ||
| data_size=10_000_000, | ||
| collection_size=10_000, | ||
| variance=1, | ||
|
|
@@ -2005,6 +2010,8 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) | |
| await seeder.run(nodes[0].client) | ||
| source_data = await StaticSeeder.capture(nodes[0].client) | ||
|
|
||
| mem_before = await get_memory(nodes[0].client, "used_memory_rss") | ||
|
|
||
| nodes[0].migrations = [ | ||
| MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) | ||
| ] | ||
|
|
@@ -2017,6 +2024,74 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) | |
| target_data = await StaticSeeder.capture(nodes[1].client) | ||
| assert source_data == target_data | ||
|
|
||
| # Get peak memory, because migration removes the data | ||
| mem_after = await get_memory(nodes[0].client, "used_memory_peak_rss") | ||
| logging.debug(f"Memory before {mem_before} after {mem_after}") | ||
| assert mem_after < mem_before * 1.1 | ||
|
|
||
|
|
||
| @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) | ||
| @pytest.mark.parametrize("chunk_size", [1_000_000, 30]) | ||
| @pytest.mark.asyncio | ||
| async def test_cluster_migration_while_seeding( | ||
| df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory, chunk_size | ||
| ): | ||
| instances = [ | ||
| df_factory.create( | ||
| port=next(next_port), | ||
| admin_port=next(next_port), | ||
| serialization_max_chunk_size=chunk_size, | ||
| ) | ||
| for _ in range(2) | ||
| ] | ||
| df_factory.start_all(instances) | ||
|
|
||
| nodes = [await create_node_info(instance) for instance in instances] | ||
| nodes[0].slots = [(0, 16383)] | ||
| nodes[1].slots = [] | ||
| client0 = nodes[0].client | ||
| client1 = nodes[1].client | ||
|
|
||
| await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) | ||
|
|
||
| logging.debug("Seeding cluster") | ||
| seeder = df_seeder_factory.create( | ||
| keys=10_000, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True | ||
| ) | ||
| await seeder.run(target_deviation=0.1) | ||
|
|
||
| seed = asyncio.create_task(seeder.run()) | ||
| await asyncio.sleep(1) | ||
|
|
||
| nodes[0].migrations = [ | ||
| MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) | ||
| ] | ||
| logging.debug("Migrating slots") | ||
| await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) | ||
|
|
||
| logging.debug("Waiting for migration to finish") | ||
| await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=300) | ||
| logging.debug("Migration finished") | ||
|
|
||
| logging.debug("Finalizing migration") | ||
| nodes[0].slots = [] | ||
| nodes[1].slots = [(0, 16383)] | ||
| await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) | ||
|
|
||
| await asyncio.sleep(1) # Let seeder feed dest before migration finishes | ||
|
|
||
| seeder.stop() | ||
| await seed | ||
| logging.debug("Seeding finished") | ||
|
|
||
| assert ( | ||
| await get_memory(client0, "used_memory_peak_rss") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran the test on your branch , the check does not fail if chunk_size=0
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did you mean that you ran this test on the anyway, yeah, this could be possible. I added this check back when we had huge values in the test. I think it's worthwhile to keep it though, for if/when we will add huge values to the v1 seeder
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran it on your branch.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was the original test that I wrote :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this are 2 different checks that we dont increase rss to much and check the migration is working correctly while sending traffic when the migration is in progress. |
||
| < await get_memory(client0, "used_memory_rss") * 1.1 | ||
| ) | ||
|
|
||
| capture = await seeder.capture_fake_redis() | ||
| assert await seeder.compare(capture, instances[1].port) | ||
|
|
||
|
|
||
| def parse_lag(replication_info: str): | ||
| lags = re.findall("lag=([0-9]+)\r\n", replication_info) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,3 +25,4 @@ pytest-emoji==0.2.0 | |
| pytest-icdiff==0.8 | ||
| pytest-timeout==2.2.0 | ||
| asyncio==3.4.3 | ||
| fakeredis[json]==2.26.2 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -177,14 +177,16 @@ async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=No | |
| ] | ||
|
|
||
| sha = await client.script_load(Seeder._load_script("generate")) | ||
| await asyncio.gather( | ||
| *(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units) | ||
| ) | ||
| for unit in self.units: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can remove this changes right? we are not using the seeder v2 in cluster now
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's true that we don't, but it took me forever to understand this is where the bug is, and we might use it in the future, so I think it's a good idea to keep it as is.. |
||
| # Must be serial, otherwise cluster clients throws an exception | ||
| await self._run_unit(client, sha, unit, using_stopkey, args) | ||
|
|
||
| async def stop(self, client: aioredis.Redis): | ||
| """Request seeder seeder if it's running without a target, future returned from start() must still be awaited""" | ||
|
|
||
| await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units)) | ||
| for unit in self.units: | ||
| # Must be serial, otherwise cluster clients throws an exception | ||
| await client.set(unit.stop_key, "X") | ||
|
|
||
| def change_key_target(self, target: int): | ||
| """Change key target, applied only on succeeding runs""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |||
| from redis import asyncio as aioredis | ||||
| from . import dfly_args | ||||
| from .seeder import Seeder, StaticSeeder | ||||
| from .instance import DflyInstanceFactory, DflyInstance | ||||
| from .utility import * | ||||
|
|
||||
|
|
||||
| @dfly_args({"proactor_threads": 4}) | ||||
|
|
@@ -114,3 +116,22 @@ async def set_data(): | |||
| # Do another change | ||||
| await async_client.spop("set1") | ||||
| assert capture != await Seeder.capture(async_client) | ||||
|
|
||||
|
|
||||
| @pytest.mark.asyncio | ||||
| @dfly_args({"proactor_threads": 2}) | ||||
| async def test_seeder_fake_redis( | ||||
| df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory | ||||
| ): | ||||
| instance = df_factory.create() | ||||
| df_factory.start_all([instance]) | ||||
|
|
||||
| seeder = df_seeder_factory.create( | ||||
| keys=100, port=instance.port, unsupported_types=[ValueType.JSON], mirror_to_fake_redis=True | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not json?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FakeRedis doesn't work with JSON in my setup, I don't know why.. dragonfly/tests/dragonfly/utility.py Line 430 in 3b082e4
|
||||
| ) | ||||
|
|
||||
| await seeder.run(target_ops=5_000) | ||||
|
|
||||
| capture = await seeder.capture_fake_redis() | ||||
|
|
||||
| assert await seeder.compare(capture, instance.port) | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| import subprocess | ||
| import pytest | ||
| import os | ||
| import fakeredis | ||
| from typing import Iterable, Union | ||
| from enum import Enum | ||
|
|
||
|
|
@@ -271,7 +272,7 @@ def gen_shrink_cmd(self): | |
| ("LPUSH {k} {val}", ValueType.LIST), | ||
| ("LPOP {k}", ValueType.LIST), | ||
| ("SADD {k} {val}", ValueType.SET), | ||
| ("SPOP {k}", ValueType.SET), | ||
| # ("SPOP {k}", ValueType.SET), # Disabled because it is inconsistent | ||
| ("HSETNX {k} v0 {val}", ValueType.HSET), | ||
| ("HINCRBY {k} v1 1", ValueType.HSET), | ||
| ("ZPOPMIN {k} 1", ValueType.ZSET), | ||
|
|
@@ -423,6 +424,7 @@ def __init__( | |
| unsupported_types=[], | ||
| stop_on_failure=True, | ||
| cluster_mode=False, | ||
| mirror_to_fake_redis=False, | ||
| ): | ||
| if cluster_mode: | ||
| max_multikey = 1 | ||
|
|
@@ -436,11 +438,16 @@ def __init__( | |
| self.multi_transaction_probability = multi_transaction_probability | ||
| self.stop_flag = False | ||
| self.stop_on_failure = stop_on_failure | ||
| self.fake_redis = None | ||
|
|
||
| self.log_file = log_file | ||
| if self.log_file is not None: | ||
| open(self.log_file, "w").close() | ||
|
|
||
| if mirror_to_fake_redis: | ||
| logging.debug("Creating FakeRedis instance") | ||
| self.fake_redis = fakeredis.FakeAsyncRedis() | ||
|
|
||
| async def run(self, target_ops=None, target_deviation=None): | ||
| """ | ||
| Run a seeding cycle on all dbs either until stop(), a fixed number of commands (target_ops) | ||
|
|
@@ -474,6 +481,14 @@ def reset(self): | |
| """Reset internal state. Needs to be called after flush or restart""" | ||
| self.gen.reset() | ||
|
|
||
| async def capture_fake_redis(self): | ||
| keys = sorted(list(self.gen.keys_and_types())) | ||
| # TODO: support multiple databases | ||
| assert self.dbcount == 1 | ||
| assert self.fake_redis != None | ||
| capture = DataCapture(await self._capture_entries(self.fake_redis, keys)) | ||
| return [capture] | ||
|
|
||
| async def capture(self, port=None): | ||
| """Create DataCapture for all dbs""" | ||
|
|
||
|
|
@@ -588,12 +603,19 @@ async def _executor_task(self, db, queue): | |
| queue.task_done() | ||
| break | ||
|
|
||
| pipe = client.pipeline(transaction=tx_data[1]) | ||
| for cmd in tx_data[0]: | ||
| pipe.execute_command(*cmd) | ||
|
|
||
| try: | ||
| await pipe.execute() | ||
| if self.fake_redis is None: | ||
| pipe = client.pipeline(transaction=tx_data[1]) | ||
| for cmd in tx_data[0]: | ||
| pipe.execute_command(*cmd) | ||
| await pipe.execute() | ||
| else: | ||
| # To mirror consistently to Fake Redis we must only send to it successful | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not so good that we dont test multi flow but its better than nothing.. |
||
| # commands. We can't use pipes because they might succeed partially. | ||
| for cmd in tx_data[0]: | ||
| dfly_resp = await client.execute_command(*cmd) | ||
| fake_resp = await self.fake_redis.execute_command(*cmd) | ||
| assert dfly_resp == fake_resp | ||
| except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: | ||
| if self.stop_on_failure: | ||
| await self._close_client(client) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when I run the test I also see that RestoreStreamer::OnDbChange and the journal calls are not executed.
Only after adding a sleep here after this line I started seeing them executed.
Please add this sleep and we should also add stats and print them to log and check the stats after running the test but lets do that in another PR