Skip to content

Commit 7860a16

Browse files
authored
feat: Yield inside huge values migration serialization (#4197)
* feat: Yield inside huge values migration serialization With #4144 we break huge values slot migration into multiple commands. This PR now adds yield between those commands. It also adds a test that checks that modifying huge values while doing a migration works well, and that RSS doesn't grow too much. Fixes #4100
1 parent ff4add0 commit 7860a16

File tree

7 files changed

+148
-16
lines changed

7 files changed

+148
-16
lines changed

src/server/journal/streamer.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,12 @@ void RestoreStreamer::Run() {
218218
if (fiber_cancelled_) // Could have been cancelled in above call too
219219
return;
220220

221+
std::lock_guard guard(big_value_mu_);
222+
223+
// Locking this never preempts. See snapshot.cc for why we need it.
224+
auto* blocking_counter = db_slice_->BlockingCounter();
225+
std::lock_guard blocking_counter_guard(*blocking_counter);
226+
221227
WriteBucket(it);
222228
});
223229

@@ -281,7 +287,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
281287

282288
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
283289
if (it.GetVersion() < snapshot_version_) {
284-
FiberAtomicGuard fg;
285290
it.SetVersion(snapshot_version_);
286291
string key_buffer; // we can reuse it
287292
for (; !it.is_done(); ++it) {
@@ -302,6 +307,7 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
302307
}
303308

304309
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
310+
std::lock_guard guard(big_value_mu_);
305311
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";
306312

307313
PrimeTable* table = db_slice_->GetTables(0).first;
@@ -319,8 +325,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
319325

320326
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
321327
uint64_t expire_ms) {
322-
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); },
323-
ServerState::tlocal()->serialization_max_chunk_size);
328+
CmdSerializer serializer(
329+
[&](std::string s) {
330+
Write(std::move(s));
331+
ThrottleIfNeeded();
332+
},
333+
ServerState::tlocal()->serialization_max_chunk_size);
324334
serializer.SerializeEntry(key, pk, pv, expire_ms);
325335
}
326336

src/server/journal/streamer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class RestoreStreamer : public JournalStreamer {
112112
cluster::SlotSet my_slots_;
113113
bool fiber_cancelled_ = false;
114114
bool snapshot_finished_ = false;
115+
ThreadLocalMutex big_value_mu_;
115116
};
116117

117118
} // namespace dfly

tests/dragonfly/cluster_test.py

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
from redis.cluster import RedisCluster
1515
from redis.cluster import ClusterNode
1616
from .proxy import Proxy
17-
from .seeder import SeederBase
18-
from .seeder import StaticSeeder
17+
from .seeder import Seeder, SeederBase, StaticSeeder
1918

2019
from . import dfly_args
2120

@@ -33,6 +32,11 @@ def monotonically_increasing_port_number():
3332
next_port = monotonically_increasing_port_number()
3433

3534

35+
async def get_memory(client, field):
36+
info = await client.info("memory")
37+
return info[field]
38+
39+
3640
class RedisClusterNode:
3741
def __init__(self, port):
3842
self.port = port
@@ -1981,6 +1985,7 @@ async def node1size0():
19811985

19821986
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
19831987
@pytest.mark.asyncio
1988+
@pytest.mark.opt_only
19841989
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
19851990
instances = [
19861991
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
@@ -1995,7 +2000,7 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
19952000

19962001
logging.debug("Generating huge containers")
19972002
seeder = StaticSeeder(
1998-
key_target=10,
2003+
key_target=100,
19992004
data_size=10_000_000,
20002005
collection_size=10_000,
20012006
variance=1,
@@ -2005,6 +2010,8 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
20052010
await seeder.run(nodes[0].client)
20062011
source_data = await StaticSeeder.capture(nodes[0].client)
20072012

2013+
mem_before = await get_memory(nodes[0].client, "used_memory_rss")
2014+
20082015
nodes[0].migrations = [
20092016
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
20102017
]
@@ -2017,6 +2024,74 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
20172024
target_data = await StaticSeeder.capture(nodes[1].client)
20182025
assert source_data == target_data
20192026

2027+
# Get peak memory, because migration removes the data
2028+
mem_after = await get_memory(nodes[0].client, "used_memory_peak_rss")
2029+
logging.debug(f"Memory before {mem_before} after {mem_after}")
2030+
assert mem_after < mem_before * 1.1
2031+
2032+
2033+
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
2034+
@pytest.mark.parametrize("chunk_size", [1_000_000, 30])
2035+
@pytest.mark.asyncio
2036+
async def test_cluster_migration_while_seeding(
2037+
df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory, chunk_size
2038+
):
2039+
instances = [
2040+
df_factory.create(
2041+
port=next(next_port),
2042+
admin_port=next(next_port),
2043+
serialization_max_chunk_size=chunk_size,
2044+
)
2045+
for _ in range(2)
2046+
]
2047+
df_factory.start_all(instances)
2048+
2049+
nodes = [await create_node_info(instance) for instance in instances]
2050+
nodes[0].slots = [(0, 16383)]
2051+
nodes[1].slots = []
2052+
client0 = nodes[0].client
2053+
client1 = nodes[1].client
2054+
2055+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
2056+
2057+
logging.debug("Seeding cluster")
2058+
seeder = df_seeder_factory.create(
2059+
keys=10_000, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True
2060+
)
2061+
await seeder.run(target_deviation=0.1)
2062+
2063+
seed = asyncio.create_task(seeder.run())
2064+
await asyncio.sleep(1)
2065+
2066+
nodes[0].migrations = [
2067+
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
2068+
]
2069+
logging.debug("Migrating slots")
2070+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
2071+
2072+
logging.debug("Waiting for migration to finish")
2073+
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=300)
2074+
logging.debug("Migration finished")
2075+
2076+
logging.debug("Finalizing migration")
2077+
nodes[0].slots = []
2078+
nodes[1].slots = [(0, 16383)]
2079+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
2080+
2081+
await asyncio.sleep(1) # Let seeder feed dest before migration finishes
2082+
2083+
seeder.stop()
2084+
await seed
2085+
logging.debug("Seeding finished")
2086+
2087+
assert (
2088+
await get_memory(client0, "used_memory_peak_rss")
2089+
< await get_memory(client0, "used_memory_rss") * 1.1
2090+
)
2091+
2092+
capture = await seeder.capture_fake_redis()
2093+
assert await seeder.compare(capture, instances[1].port)
2094+
20202095

20212096
def parse_lag(replication_info: str):
20222097
lags = re.findall("lag=([0-9]+)\r\n", replication_info)

tests/dragonfly/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ pytest-emoji==0.2.0
2525
pytest-icdiff==0.8
2626
pytest-timeout==2.2.0
2727
asyncio==3.4.3
28+
fakeredis[json]==2.26.2

tests/dragonfly/seeder/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,16 @@ async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=No
177177
]
178178

179179
sha = await client.script_load(Seeder._load_script("generate"))
180-
await asyncio.gather(
181-
*(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units)
182-
)
180+
for unit in self.units:
181+
# Must be serial, otherwise cluster clients throws an exception
182+
await self._run_unit(client, sha, unit, using_stopkey, args)
183183

184184
async def stop(self, client: aioredis.Redis):
185185
"""Request seeder seeder if it's running without a target, future returned from start() must still be awaited"""
186186

187-
await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units))
187+
for unit in self.units:
188+
# Must be serial, otherwise cluster clients throws an exception
189+
await client.set(unit.stop_key, "X")
188190

189191
def change_key_target(self, target: int):
190192
"""Change key target, applied only on succeeding runs"""

tests/dragonfly/seeder_test.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from redis import asyncio as aioredis
55
from . import dfly_args
66
from .seeder import Seeder, StaticSeeder
7+
from .instance import DflyInstanceFactory, DflyInstance
8+
from .utility import *
79

810

911
@dfly_args({"proactor_threads": 4})
@@ -114,3 +116,22 @@ async def set_data():
114116
# Do another change
115117
await async_client.spop("set1")
116118
assert capture != await Seeder.capture(async_client)
119+
120+
121+
@pytest.mark.asyncio
122+
@dfly_args({"proactor_threads": 2})
123+
async def test_seeder_fake_redis(
124+
df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory
125+
):
126+
instance = df_factory.create()
127+
df_factory.start_all([instance])
128+
129+
seeder = df_seeder_factory.create(
130+
keys=100, port=instance.port, unsupported_types=[ValueType.JSON], mirror_to_fake_redis=True
131+
)
132+
133+
await seeder.run(target_ops=5_000)
134+
135+
capture = await seeder.capture_fake_redis()
136+
137+
assert await seeder.compare(capture, instance.port)

tests/dragonfly/utility.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import subprocess
1515
import pytest
1616
import os
17+
import fakeredis
1718
from typing import Iterable, Union
1819
from enum import Enum
1920

@@ -271,7 +272,7 @@ def gen_shrink_cmd(self):
271272
("LPUSH {k} {val}", ValueType.LIST),
272273
("LPOP {k}", ValueType.LIST),
273274
("SADD {k} {val}", ValueType.SET),
274-
("SPOP {k}", ValueType.SET),
275+
# ("SPOP {k}", ValueType.SET), # Disabled because it is inconsistent
275276
("HSETNX {k} v0 {val}", ValueType.HSET),
276277
("HINCRBY {k} v1 1", ValueType.HSET),
277278
("ZPOPMIN {k} 1", ValueType.ZSET),
@@ -423,6 +424,7 @@ def __init__(
423424
unsupported_types=[],
424425
stop_on_failure=True,
425426
cluster_mode=False,
427+
mirror_to_fake_redis=False,
426428
):
427429
if cluster_mode:
428430
max_multikey = 1
@@ -436,11 +438,16 @@ def __init__(
436438
self.multi_transaction_probability = multi_transaction_probability
437439
self.stop_flag = False
438440
self.stop_on_failure = stop_on_failure
441+
self.fake_redis = None
439442

440443
self.log_file = log_file
441444
if self.log_file is not None:
442445
open(self.log_file, "w").close()
443446

447+
if mirror_to_fake_redis:
448+
logging.debug("Creating FakeRedis instance")
449+
self.fake_redis = fakeredis.FakeAsyncRedis()
450+
444451
async def run(self, target_ops=None, target_deviation=None):
445452
"""
446453
Run a seeding cycle on all dbs either until stop(), a fixed number of commands (target_ops)
@@ -474,6 +481,14 @@ def reset(self):
474481
"""Reset internal state. Needs to be called after flush or restart"""
475482
self.gen.reset()
476483

484+
async def capture_fake_redis(self):
485+
keys = sorted(list(self.gen.keys_and_types()))
486+
# TODO: support multiple databases
487+
assert self.dbcount == 1
488+
assert self.fake_redis != None
489+
capture = DataCapture(await self._capture_entries(self.fake_redis, keys))
490+
return [capture]
491+
477492
async def capture(self, port=None):
478493
"""Create DataCapture for all dbs"""
479494

@@ -588,12 +603,19 @@ async def _executor_task(self, db, queue):
588603
queue.task_done()
589604
break
590605

591-
pipe = client.pipeline(transaction=tx_data[1])
592-
for cmd in tx_data[0]:
593-
pipe.execute_command(*cmd)
594-
595606
try:
596-
await pipe.execute()
607+
if self.fake_redis is None:
608+
pipe = client.pipeline(transaction=tx_data[1])
609+
for cmd in tx_data[0]:
610+
pipe.execute_command(*cmd)
611+
await pipe.execute()
612+
else:
613+
# To mirror consistently to Fake Redis we must only send to it successful
614+
# commands. We can't use pipes because they might succeed partially.
615+
for cmd in tx_data[0]:
616+
dfly_resp = await client.execute_command(*cmd)
617+
fake_resp = await self.fake_redis.execute_command(*cmd)
618+
assert dfly_resp == fake_resp
597619
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
598620
if self.stop_on_failure:
599621
await self._close_client(client)

0 commit comments

Comments
 (0)