Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ void RestoreStreamer::Run() {
if (fiber_cancelled_) // Could have been cancelled in above call too
return;

std::lock_guard guard(big_value_mu_);

// Locking this never preempts. See snapshot.cc for why we need it.
auto* blocking_counter = db_slice_->BlockingCounter();
std::lock_guard blocking_counter_guard(*blocking_counter);

WriteBucket(it);
});

Expand Down Expand Up @@ -281,7 +287,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {

void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
FiberAtomicGuard fg;
it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
Expand All @@ -302,6 +307,7 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
}

void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
std::lock_guard guard(big_value_mu_);
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

PrimeTable* table = db_slice_->GetTables(0).first;
Expand All @@ -319,8 +325,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); },
ServerState::tlocal()->serialization_max_chunk_size);
CmdSerializer serializer(
[&](std::string s) {
Write(std::move(s));
ThrottleIfNeeded();
},
ServerState::tlocal()->serialization_max_chunk_size);
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

Expand Down
1 change: 1 addition & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;
ThreadLocalMutex big_value_mu_;
};

} // namespace dfly
81 changes: 78 additions & 3 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
from redis.cluster import RedisCluster
from redis.cluster import ClusterNode
from .proxy import Proxy
from .seeder import SeederBase
from .seeder import StaticSeeder
from .seeder import Seeder, SeederBase, StaticSeeder

from . import dfly_args

Expand All @@ -33,6 +32,11 @@ def monotonically_increasing_port_number():
next_port = monotonically_increasing_port_number()


async def get_memory(client, field):
info = await client.info("memory")
return info[field]


class RedisClusterNode:
def __init__(self, port):
self.port = port
Expand Down Expand Up @@ -1981,6 +1985,7 @@ async def node1size0():

@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.asyncio
@pytest.mark.opt_only
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
instances = [
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
Expand All @@ -1995,7 +2000,7 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)

logging.debug("Generating huge containers")
seeder = StaticSeeder(
key_target=10,
key_target=100,
data_size=10_000_000,
collection_size=10_000,
variance=1,
Expand All @@ -2005,6 +2010,8 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
await seeder.run(nodes[0].client)
source_data = await StaticSeeder.capture(nodes[0].client)

mem_before = await get_memory(nodes[0].client, "used_memory_rss")

nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
]
Expand All @@ -2017,6 +2024,74 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
target_data = await StaticSeeder.capture(nodes[1].client)
assert source_data == target_data

# Get peak memory, because migration removes the data
mem_after = await get_memory(nodes[0].client, "used_memory_peak_rss")
logging.debug(f"Memory before {mem_before} after {mem_after}")
assert mem_after < mem_before * 1.1


@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.parametrize("chunk_size", [1_000_000, 30])
@pytest.mark.asyncio
async def test_cluster_migration_while_seeding(
df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory, chunk_size
):
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
serialization_max_chunk_size=chunk_size,
)
for _ in range(2)
]
df_factory.start_all(instances)

nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
client0 = nodes[0].client
client1 = nodes[1].client

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Seeding cluster")
seeder = df_seeder_factory.create(
keys=10_000, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True
)
await seeder.run(target_deviation=0.1)

seed = asyncio.create_task(seeder.run())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I run the test I also see that RestoreStreamer::OnDbChange and the journal calls are not executed.
Only after adding a sleep here after this line I started seeing them executed.
Please add this sleep and we should also add stats and print them to log and check the stats after running the test but lets do that in another PR

await asyncio.sleep(1)

nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
]
logging.debug("Migrating slots")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Waiting for migration to finish")
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=300)
logging.debug("Migration finished")

logging.debug("Finalizing migration")
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await asyncio.sleep(1) # Let seeder feed dest before migration finishes

seeder.stop()
await seed
logging.debug("Seeding finished")

assert (
await get_memory(client0, "used_memory_peak_rss")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test on your branch , the check does not fail if chunk_size=0
maybe the data pushed by seeder is not big enough to trigger this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean that you ran this test on the main branch?

anyway, yeah, this could be possible. I added this check back when we had huge values in the test. I think it's worthwhile to keep it though, for if/when we will add huge values to the v1 seeder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran it on your branch.
So do we have any test that we can this check to make sure we actually reduce the memory usage with your changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the original test that I wrote :)
I filled with custom (huge) data, verified consistency, and checked that memory did not increase too much.
We can't properly test that we did not increase in memory if we don't migrate huge values... but we can do that in another test, that does not migrate while seeding. I'll do that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this are 2 different checks that we dont increase rss to much and check the migration is working correctly while sending traffic when the migration is in progress.
It does not have to be in the same test

< await get_memory(client0, "used_memory_rss") * 1.1
)

capture = await seeder.capture_fake_redis()
assert await seeder.compare(capture, instances[1].port)


def parse_lag(replication_info: str):
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
Expand Down
1 change: 1 addition & 0 deletions tests/dragonfly/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ pytest-emoji==0.2.0
pytest-icdiff==0.8
pytest-timeout==2.2.0
asyncio==3.4.3
fakeredis[json]==2.26.2
10 changes: 6 additions & 4 deletions tests/dragonfly/seeder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,16 @@ async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=No
]

sha = await client.script_load(Seeder._load_script("generate"))
await asyncio.gather(
*(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units)
)
for unit in self.units:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this changes right? we are not using the seeder v2 in cluster now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that we don't, but it took me forever to understand this is where the bug is, and we might use it in the future, so I think it's a good idea to keep it as is..

# Must be serial, otherwise cluster clients throws an exception
await self._run_unit(client, sha, unit, using_stopkey, args)

async def stop(self, client: aioredis.Redis):
"""Request seeder seeder if it's running without a target, future returned from start() must still be awaited"""

await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units))
for unit in self.units:
# Must be serial, otherwise cluster clients throws an exception
await client.set(unit.stop_key, "X")

def change_key_target(self, target: int):
"""Change key target, applied only on succeeding runs"""
Expand Down
21 changes: 21 additions & 0 deletions tests/dragonfly/seeder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from redis import asyncio as aioredis
from . import dfly_args
from .seeder import Seeder, StaticSeeder
from .instance import DflyInstanceFactory, DflyInstance
from .utility import *


@dfly_args({"proactor_threads": 4})
Expand Down Expand Up @@ -114,3 +116,22 @@ async def set_data():
# Do another change
await async_client.spop("set1")
assert capture != await Seeder.capture(async_client)


@pytest.mark.asyncio
@dfly_args({"proactor_threads": 2})
async def test_seeder_fake_redis(
df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory
):
instance = df_factory.create()
df_factory.start_all([instance])

seeder = df_seeder_factory.create(
keys=100, port=instance.port, unsupported_types=[ValueType.JSON], mirror_to_fake_redis=True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not json?

Copy link
Contributor Author

@chakaz chakaz Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FakeRedis doesn't work with JSON in my setup, I don't know why..
Since JSON is disabled in cluster anyway, I thought that it wouldn't matter too much, see:

unsupported_types.append(ValueType.JSON) # Cluster aio client doesn't support JSON

)

await seeder.run(target_ops=5_000)

capture = await seeder.capture_fake_redis()

assert await seeder.compare(capture, instance.port)
34 changes: 28 additions & 6 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import subprocess
import pytest
import os
import fakeredis
from typing import Iterable, Union
from enum import Enum

Expand Down Expand Up @@ -271,7 +272,7 @@ def gen_shrink_cmd(self):
("LPUSH {k} {val}", ValueType.LIST),
("LPOP {k}", ValueType.LIST),
("SADD {k} {val}", ValueType.SET),
("SPOP {k}", ValueType.SET),
# ("SPOP {k}", ValueType.SET), # Disabled because it is inconsistent
("HSETNX {k} v0 {val}", ValueType.HSET),
("HINCRBY {k} v1 1", ValueType.HSET),
("ZPOPMIN {k} 1", ValueType.ZSET),
Expand Down Expand Up @@ -423,6 +424,7 @@ def __init__(
unsupported_types=[],
stop_on_failure=True,
cluster_mode=False,
mirror_to_fake_redis=False,
):
if cluster_mode:
max_multikey = 1
Expand All @@ -436,11 +438,16 @@ def __init__(
self.multi_transaction_probability = multi_transaction_probability
self.stop_flag = False
self.stop_on_failure = stop_on_failure
self.fake_redis = None

self.log_file = log_file
if self.log_file is not None:
open(self.log_file, "w").close()

if mirror_to_fake_redis:
logging.debug("Creating FakeRedis instance")
self.fake_redis = fakeredis.FakeAsyncRedis()

async def run(self, target_ops=None, target_deviation=None):
"""
Run a seeding cycle on all dbs either until stop(), a fixed number of commands (target_ops)
Expand Down Expand Up @@ -474,6 +481,14 @@ def reset(self):
"""Reset internal state. Needs to be called after flush or restart"""
self.gen.reset()

async def capture_fake_redis(self):
keys = sorted(list(self.gen.keys_and_types()))
# TODO: support multiple databases
assert self.dbcount == 1
assert self.fake_redis != None
capture = DataCapture(await self._capture_entries(self.fake_redis, keys))
return [capture]

async def capture(self, port=None):
"""Create DataCapture for all dbs"""

Expand Down Expand Up @@ -588,12 +603,19 @@ async def _executor_task(self, db, queue):
queue.task_done()
break

pipe = client.pipeline(transaction=tx_data[1])
for cmd in tx_data[0]:
pipe.execute_command(*cmd)

try:
await pipe.execute()
if self.fake_redis is None:
pipe = client.pipeline(transaction=tx_data[1])
for cmd in tx_data[0]:
pipe.execute_command(*cmd)
await pipe.execute()
else:
# To mirror consistently to Fake Redis we must only send to it successful
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not so good that we dont test multi flow but its better than nothing..
I will continue thinking if we have a better approach to testing correctness of migraion so we will not have this limitaion

# commands. We can't use pipes because they might succeed partially.
for cmd in tx_data[0]:
dfly_resp = await client.execute_command(*cmd)
fake_resp = await self.fake_redis.execute_command(*cmd)
assert dfly_resp == fake_resp
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
if self.stop_on_failure:
await self._close_client(client)
Expand Down
Loading