Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,21 @@ replication-delay-bytes 16384
# Default: 16 updates
replication-delay-updates 16

# Maximum sequence lag allowed before disconnecting a slow replica.
# If a replica falls behind by more than this many sequences, the master will
# disconnect it to prevent WAL exhaustion. The replica can then reconnect and
# attempt partial sync (psync) if the sequence is still available.
# Set to 0 to disable this check (default).
# Default: 0 (disabled)
max-replication-lag 0

# Timeout in milliseconds for socket send operations to replicas.
# If sending data to a replica blocks for longer than this timeout,
# the connection will be dropped. This prevents the replication feed thread
# from blocking indefinitely on slow consumers.
# Default: 30000 (30 seconds)
replication-send-timeout-ms 30000

# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
Expand Down
38 changes: 32 additions & 6 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::
next_repl_seq_(next_repl_seq),
req_(srv),
max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates),
max_replication_lag_(srv->GetConfig()->max_replication_lag),
send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {}

Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
Expand Down Expand Up @@ -184,6 +186,21 @@ void FeedSlaveThread::loop() {
while (!IsStopped()) {
auto curr_seq = next_repl_seq_.load();

// Check replication lag - disconnect slow consumers before WAL is exhausted
// Skip check if max_replication_lag_ is 0 (feature disabled)
if (max_replication_lag_ > 0) {
auto latest_seq = srv_->storage->LatestSeqNumber();
if (latest_seq > curr_seq) {
auto lag = static_cast<int64_t>(latest_seq - curr_seq);
if (lag > max_replication_lag_) {
ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, disconnecting to prevent WAL exhaustion",
lag, max_replication_lag_, conn_->GetAnnounceIP(), conn_->GetListeningPort());
Stop();
return;
}
}
}

if (!iter_ || !iter_->Valid()) {
if (iter_) INFO("WAL was rotated, would reopen again");
if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
Expand Down Expand Up @@ -221,10 +238,12 @@ void FeedSlaveThread::loop() {
batches_bulk += redis::BulkString("_getack");
}

// Send entire bulk which contain multiple batches
auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
// Send entire bulk which contain multiple batches with timeout
// This prevents blocking indefinitely on slow consumers
auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent(), send_timeout_ms_);
if (!s.IsOK()) {
ERROR("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk));
ERROR("Write error while sending batch to slave {}:{}: {}. batch_size={}", conn_->GetAnnounceIP(),
conn_->GetListeningPort(), s.Msg(), batches_bulk.size());
Stop();
return;
}
Expand Down Expand Up @@ -260,9 +279,14 @@ void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
}
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
ERROR("[replication] connection error/eof, reconnect the master");
// Wait a bit and reconnect
// Wait with exponential backoff before reconnecting
constexpr int kMaxBackoffSeconds = 60;
constexpr int kMaxShiftBits = 6; // Cap shift to avoid UB; 2^6 = 64 then clamped to 60
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::seconds(1));
int attempts = repl_->reconnect_attempts_.fetch_add(1, std::memory_order_relaxed);
int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), kMaxBackoffSeconds);
WARN("[replication] waiting {} seconds before reconnecting (attempt {})", backoff_secs, attempts + 1);
std::this_thread::sleep_for(std::chrono::seconds(backoff_secs));
Stop();
Start();
}
Expand Down Expand Up @@ -634,6 +658,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
} else {
// PSYNC is OK, use IncrementBatchLoop
INFO("[replication] PSync is ok, start increment batch loop");
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful connection
return CBState::NEXT;
}
}
Expand Down Expand Up @@ -879,6 +904,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
return CBState::RESTART;
}
INFO("[replication] Succeeded restoring the backup, fullsync was finish");
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful fullsync
post_fullsync_cb_();

// It needs to reload namespaces from DB after the full sync is done,
Expand Down
3 changes: 3 additions & 0 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class FeedSlaveThread {
// Configurable delay limits
size_t max_delay_bytes_;
size_t max_delay_updates_;
int64_t max_replication_lag_;
int send_timeout_ms_;

void loop();
void checkLivenessIfNeed();
Expand Down Expand Up @@ -166,6 +168,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
const bool replication_group_sync_ = false;
std::atomic<int64_t> last_io_time_secs_ = 0;
int64_t last_ack_time_secs_ = 0;
std::atomic<int> reconnect_attempts_ = 0; // For exponential backoff on reconnection
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;

Expand Down
97 changes: 97 additions & 0 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include <poll.h>
#include <sys/types.h>

#include <chrono>

#include "fmt/ostream.h"
#include "scope_exit.h"
#include "server/tls_util.h"

#ifdef __linux__
Expand Down Expand Up @@ -468,6 +471,100 @@ Status SockSend(int fd, const std::string &data, [[maybe_unused]] bufferevent *b
#endif
}

Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms) {
// Fall back to blocking send if timeout is non-positive
if (timeout_ms <= 0) {
return SockSend(fd, data);
}

ssize_t n = 0;
auto start = std::chrono::steady_clock::now();

while (n < static_cast<ssize_t>(data.size())) {
// Check if we've exceeded the timeout
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count();
if (elapsed >= timeout_ms) {
return {Status::NotOK, fmt::format("send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())};
}

// Calculate remaining timeout
int remaining_ms = timeout_ms - static_cast<int>(elapsed);

// Wait for socket to be writable with timeout
int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
if (ready == 0) {
return {Status::NotOK, fmt::format("send timeout waiting for socket, sent {} of {} bytes", n, data.size())};
}
if (ready < 0) {
return Status::FromErrno("poll error while sending");
}

ssize_t nwritten = write(fd, data.data() + n, data.size() - n);
if (nwritten == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Socket buffer is full, continue waiting
continue;
}
return Status::FromErrno();
}
n += nwritten;
}
return Status::OK();
}

Status SockSendWithTimeout(int fd, const std::string &data, [[maybe_unused]] bufferevent *bev, int timeout_ms) {
// Fall back to blocking send if timeout is non-positive
if (timeout_ms <= 0) {
return SockSend(fd, data, bev);
}

#ifdef ENABLE_OPENSSL
auto ssl = bufferevent_openssl_get_ssl(bev);
if (ssl) {
// Save original flags and set socket to non-blocking for timeout support
int orig_flags = fcntl(fd, F_GETFL);
if (orig_flags == -1) return Status::FromErrno("fcntl(F_GETFL)");

auto s = SockSetBlocking(fd, 0);
if (!s.IsOK()) return s;

// Restore original flags on scope exit
auto restore_flags = MakeScopeExit([fd, orig_flags] { fcntl(fd, F_SETFL, orig_flags); });

ssize_t n = 0;
auto start = std::chrono::steady_clock::now();

while (n < static_cast<ssize_t>(data.size())) {
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count();
if (elapsed >= timeout_ms) {
return {Status::NotOK,
fmt::format("SSL send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())};
}

int remaining_ms = timeout_ms - static_cast<int>(elapsed);
int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
if (ready <= 0) {
return {Status::NotOK, fmt::format("SSL send timeout waiting for socket, sent {} of {} bytes", n, data.size())};
}

int nwritten = SSL_write(ssl, data.data() + n, static_cast<int>(data.size() - n));
if (nwritten <= 0) {
int err = SSL_get_error(ssl, nwritten);
if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
continue;
}
return {Status::NotOK, fmt::format("SSL_write error: {}", err)};
}
n += nwritten;
}
return Status::OK();
}
#endif
return SockSendWithTimeout(fd, data, timeout_ms);
}

StatusOr<int> SockConnect(const std::string &host, uint32_t port, [[maybe_unused]] ssl_st *ssl, int conn_timeout,
int timeout) {
#ifdef ENABLE_OPENSSL
Expand Down
4 changes: 4 additions & 0 deletions src/common/io_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset);
Status SockSend(int fd, const std::string &data, ssl_st *ssl);
Status SockSend(int fd, const std::string &data, bufferevent *bev);

// Send with timeout - returns error if send would block for longer than timeout_ms
Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms);
Status SockSendWithTimeout(int fd, const std::string &data, bufferevent *bev, int timeout_ms);

Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl);
Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev);

Expand Down
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ Config::Config() {
{"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)},
{"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
{"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
{"max-replication-lag", false, new Int64Field(&max_replication_lag, 0, 0, INT64_MAX)},
{"replication-send-timeout-ms", false, new IntField(&replication_send_timeout_ms, 30000, 1000, 300000)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ struct Config {
int replication_recv_timeout_ms = 3200;
int max_replication_delay_bytes = 16 * 1024; // 16KB default
int max_replication_delay_updates = 16; // 16 updates default
int64_t max_replication_lag = 0; // 0 = disabled, otherwise max sequences before disconnecting slow consumer
int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;
Expand Down
2 changes: 2 additions & 0 deletions tests/cppunit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ TEST(Config, GetAndSet) {
{"rocksdb.max_background_jobs", "4"},
{"rocksdb.compression_start_level", "2"},
{"rocksdb.sst_file_delete_rate_bytes_per_sec", "0"},
{"max-replication-lag", "50000000"},
{"replication-send-timeout-ms", "60000"},
};
std::vector<std::string> values;
for (const auto &iter : mutable_cases) {
Expand Down
7 changes: 4 additions & 3 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,10 @@ func TestClusterReset(t *testing.T) {

t.Run("cannot reset cluster if the db is migrating the slot", func(t *testing.T) {
slotNum := 2
// slow down the migration speed to avoid breaking other test cases
require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "128").Err())
for i := 0; i < 1024; i++ {
// slow down the migration speed to ensure we can observe the "start" state
// before migration completes (especially on fast hardware like macOS ARM)
require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "64").Err())
for i := 0; i < 2048; i++ {
require.NoError(t, rdb0.RPush(ctx, "my-list", fmt.Sprintf("element%d", i)).Err())
}

Expand Down
85 changes: 85 additions & 0 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,3 +711,88 @@ func TestReplicationWatermark(t *testing.T) {
// The small command should be processed much faster than 1 second
require.Less(t, duration, 1*time.Second, "small command should be processed promptly")
}

func TestReplicationSlowConsumerConfig(t *testing.T) {
t.Parallel()
ctx := context.Background()

// This test verifies the slow consumer protection config options are working:
// - max-replication-lag: threshold before disconnecting slow consumers
// - replication-send-timeout-ms: timeout for socket sends to replicas
master := util.StartServer(t, map[string]string{
"max-replication-lag": "100000000",
"replication-send-timeout-ms": "30000",
})
defer master.Close()
masterClient := master.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()

slave := util.StartServer(t, map[string]string{})
defer slave.Close()
slaveClient := slave.NewClient()
defer func() { require.NoError(t, slaveClient.Close()) }()

t.Run("Slow consumer config options are readable and settable", func(t *testing.T) {
// Verify initial config values
maxLag := masterClient.ConfigGet(ctx, "max-replication-lag").Val()
require.Equal(t, "100000000", maxLag["max-replication-lag"])

sendTimeout := masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val()
require.Equal(t, "30000", sendTimeout["replication-send-timeout-ms"])

// Test CONFIG SET for max-replication-lag
require.NoError(t, masterClient.ConfigSet(ctx, "max-replication-lag", "50000000").Err())
maxLag = masterClient.ConfigGet(ctx, "max-replication-lag").Val()
require.Equal(t, "50000000", maxLag["max-replication-lag"])

// Test CONFIG SET for replication-send-timeout-ms
require.NoError(t, masterClient.ConfigSet(ctx, "replication-send-timeout-ms", "15000").Err())
sendTimeout = masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val()
require.Equal(t, "15000", sendTimeout["replication-send-timeout-ms"])

// Verify replication still works normally with these config options
util.SlaveOf(t, slaveClient, master)
util.WaitForSync(t, slaveClient)
require.Equal(t, "slave", util.FindInfoEntry(slaveClient, "role"))

require.NoError(t, masterClient.Set(ctx, "test_key", "test_value", 0).Err())
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
require.Equal(t, "test_value", slaveClient.Get(ctx, "test_key").Val())
})
}

func TestReplicationExponentialBackoff(t *testing.T) {
t.Parallel()
ctx := context.Background()

master := util.StartServer(t, map[string]string{})
defer master.Close()
masterClient := master.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()

slave := util.StartServer(t, map[string]string{})
defer slave.Close()
slaveClient := slave.NewClient()
defer func() { require.NoError(t, slaveClient.Close()) }()

t.Run("Slave uses exponential backoff on reconnection", func(t *testing.T) {
// Connect slave to master
util.SlaveOf(t, slaveClient, master)
util.WaitForSync(t, slaveClient)

// Kill the slave connection from master side to trigger reconnection
_, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result()
require.NoError(t, err)

// The slave should log backoff messages when reconnecting
// First reconnection attempt should wait 1 second
require.Eventually(t, func() bool {
return slave.LogFileMatches(t, ".*waiting 1 seconds before reconnecting.*")
}, 10*time.Second, 200*time.Millisecond, "slave should log backoff on first reconnection")

// Slave should eventually reconnect
require.Eventually(t, func() bool {
return util.FindInfoEntry(slaveClient, "master_link_status") == "up"
}, 15*time.Second, 500*time.Millisecond, "slave should reconnect with backoff")
})
}
Loading
Loading