Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor return and goto statements #945

Merged
merged 15 commits into from
Oct 15, 2024
Merged

Conversation

naglera
Copy link
Contributor

@naglera naglera commented Aug 26, 2024

Consolidate the cleanup of local variables to a single point within the method, ensuring proper resource management and p
reventing memory leaks or double-free issues.

Previoslly descused here:

@naglera naglera requested a review from PingXie August 26, 2024 07:36
Copy link

codecov bot commented Aug 26, 2024

Codecov Report

Attention: Patch coverage is 76.47059% with 40 lines in your changes missing coverage. Please review.

Project coverage is 70.64%. Comparing base (36d438b) to head (bbc498f).
Report is 7 commits behind head on unstable.

Files with missing lines Patch % Lines
src/replication.c 76.47% 40 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable     #945      +/-   ##
============================================
- Coverage     70.69%   70.64%   -0.05%     
============================================
  Files           114      114              
  Lines         61693    61735      +42     
============================================
+ Hits          43611    43613       +2     
- Misses        18082    18122      +40     
Files with missing lines Coverage Δ
src/server.h 100.00% <ø> (ø)
src/replication.c 87.45% <76.47%> (-0.56%) ⬇️

... and 10 files with indirect coverage changes

@@ -3541,7 +3531,7 @@ void syncWithPrimary(connection *conn) {
clearFailoverState();
} else {
abortFailover("Failover target rejected psync request");
return;
goto cleanup;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no cleanup needed in line 3524?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right

Copy link
Member

@PingXie PingXie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider refactoring this code further by breaking out the different if paths so each function, including fullSyncWithPrimary, is dealing with a single concern. What do you think about a skeleton function like this?

static void fullSyncWithPrimary(connection *conn) {
    char *err = NULL;

    serverAssert(conn == server.repl_rdb_transfer_s);

    if (server.repl_state == REPL_STATE_NONE) goto error;
    if (connGetState(conn) != CONN_STATE_CONNECTED) {
        serverLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", connGetLastError(conn));
        goto error;
    }

    switch (server.repl_rdb_channel_state) {
        case REPL_DUAL_CHANNEL_SEND_HANDSHAKE:
            handleHandshake(conn, &err); /* TO BE FILLED OUT */
            break;
        case REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY:
            if (server.primary_auth) {
                handleAuthReply(conn, &err);  /* TO BE FILLED OUT */
            } else {
                server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
            }
            break;
        case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY:
            handleReplConfReply(conn, &err);  /* TO BE FILLED OUT */
            break;
        case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF:
            handleEndOffsetResponse(conn, &err);  /* TO BE FILLED OUT */
            break;
        default:
            break;
    }

    if (err) goto error;
    return;

error:
    if (err) sdsfree(err);
    connClose(conn);
    server.repl_transfer_s = NULL;
    if (server.repl_rdb_transfer_s) {
        connClose(server.repl_rdb_transfer_s);
        server.repl_rdb_transfer_s = NULL;
    }
    if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd);
    server.repl_transfer_fd = -1;
    server.repl_state = REPL_STATE_CONNECT;
    replicationAbortDualChannelSyncTransfer();
}

@naglera
Copy link
Contributor Author

naglera commented Sep 1, 2024

Started with setupMainConnForPsync and fullSyncWithPrimary. Do we also want to refactor syncWithPrimary?

Copy link
Member

@PingXie PingXie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. My feedback is all on fit-n-finish.

int ret;

switch (server.repl_state) {
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this naming scheme but can we shorten it a bit? It is a bit mouthful :)

Suggested change
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break;
case REPL_STATE_SEND_HANDSHAKE: ret = dualChanReplMainConnSendHandshake(conn, &err); break;

/* Replication: Replica side.
* This connection handler fires after rdb-connection was initialized. We use it
* to adjust the replica main for loading incremental changes into the local buffer. */
void setupMainConnForPsync(connection *conn) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about prefixing this function with dualChan as well?

Suggested change
void setupMainConnForPsync(connection *conn) {
void dualChanSetupMainConnForPsync(connection *conn) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I just don't like the abbreviation Channel => Chan, I don't think it is very clean.

switch (server.repl_state) {
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break;
case REPL_STATE_RECEIVE_CAPA_REPLY:
ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err);
ret = dualChanReplMainConnRecvCapaReply(conn, &err);

sdsfree(err);
err = NULL;
/* fall through */
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break;
case REPL_STATE_SEND_PSYNC: ret = dualChanReplMainConnSendPsync(conn, &err); break;

err = NULL;
/* fall through */
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break;
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break;
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChanReplMainConnRecvPsyncReply(conn, &err); break;

@@ -55,6 +55,7 @@ int cancelReplicationHandshake(int reconnect);
void replicationSteadyStateInit(void);
void setupMainConnForPsync(connection *conn);
void dualChannelSyncHandleRdbLoadCompletion(void);
static void fullSyncWithPrimary(connection *conn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used in the dual-chan replication path only so I would think it makes more sense to prefix it too

Suggested change
static void fullSyncWithPrimary(connection *conn);
static void dualChanFullSyncWithPrimary(connection *conn);

src/replication.c Show resolved Hide resolved
src/replication.c Outdated Show resolved Hide resolved
src/replication.c Show resolved Hide resolved
}

int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) {
UNUSED(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this function can't fail, I would suggest we null out err so we stick to a more intuitive contract, which is null err means success. Let's not leave the err state undefined (or decided by the caller).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of relying on the error string. From my observation, it is not maintained for every error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is another inconsistency in the existing code. Functions like sendCommand return a non-NULL error string to indicate failures already. I am fine with treating an error string returned via an outgoing parameter as "informational" and for "logging purpose only" hence optional. This should help ensure consistency.

@PingXie
Copy link
Member

PingXie commented Sep 1, 2024

Started with setupMainConnForPsync and fullSyncWithPrimary. Do we also want to refactor syncWithPrimary?

+1 on refactoring syncWithPrimary as well but maybe do it in a separate PR? I don't have a strong opinion though (between two PRs or a combined PRs for both syncWithPrimary and fullSyncWithPrimary).

Copy link
Member

@PingXie PingXie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @naglera! I think we are very close.

src/replication.c Outdated Show resolved Hide resolved
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
return C_ERR;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see us pulling all state changes into the driver function, i.e., dualChannelFullSyncWithPrimary in this case. This way it is very easy to see all the state transitions together. I will suggest some changes in dualChannelFullSyncWithPrimary next

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nice

int ret;

switch (server.repl_state) {
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplMainConnSendHandshake(conn, &err); break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I meant by consolidating state manipulations in the "driver" function and then all the "worker" functions (such as dualChannelReplMainConnRecvCapaReply) become "stateless".

Suggested change
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplMainConnSendHandshake(conn, &err); break;
case REPL_STATE_SEND_HANDSHAKE:
ret = dualChannelReplMainConnSendHandshake(conn, &err);
if (ret != C_ERR) server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
/* else server.repl_state = XXX; // if it makes sense */
break;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@@ -3415,7 +3447,7 @@ void syncWithPrimary(connection *conn) {
if (err) goto write_error;

server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
goto cleanup;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to move these changes to the PR that refactors syncWithPrimary too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I will revert it

src/replication.c Show resolved Hide resolved
}

int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) {
UNUSED(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is another inconsistency in the existing code. Functions like sendCommand return a non-NULL error string to indicate failures already. I am fine with treating an error string returned via an outgoing parameter as "informational" and for "logging purpose only" hence optional. This should help ensure consistency.

return C_OK;
}

#define C_RETRY -2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to where C_OK/C_ERR is defined?


error:
sdsfree(err);
connClose(conn);
server.repl_transfer_s = NULL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a bug to me. It seems the two connection objects are getting mixed up. We should zero out the RDB channel connection here, i.e., server.repl_rdb_transfer_s = NULL. Then on L2726/L2747, we should be testing the normal channel connection instead, i.e., if (server.repl_transfer_s)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we are trying to close repl_transfer_s twice 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix #1088 in a separate PR first? I think we will need to backport it to Valkey 8.0

src/replication.c Show resolved Hide resolved
}
switch (server.repl_rdb_channel_state) {
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE:
ret = dualChannelReplHandleHandshake(conn, &err);
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only update the state upon successful returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't matter because in case of C_ERR, we will reset the state, but I agree that it makes more sense for the reader. I'll fix it.


if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", *err,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems incorrect. *err is not assigned at this point. we should revert back to strerror(errno) I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right err was not assigned in this case. will fix

sdsfree(err);
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_transfer_s) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding sdsfree(err) after the error: label as well so that if we ever goto error in the future with a non-NULL err, we won't leak memory accidentally.

/* AUTH with the primary if required. */
switch (server.repl_rdb_channel_state) {
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE:
ret = dualChannelReplHandleHandshake(conn, &err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realize that we never consume err in dualChannelFullSyncWithPrimary, whose only job when it comes to err is to free the memory. Was a logging statement missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, we used to log it in case of an error, but after the refactor we're already logging it inside the helper functions, so there's no need to log it inside dualChannelFullSyncWithPrimary anymore. That being said, I believe we should retain the error deallocation logic within dualChannelFullSyncWithPrimary for the sake of maintaining simplicity in our code structure.

src/replication.c Show resolved Hide resolved
break;
default:
serverLog(LL_WARNING, "Unexpected replication state: %d", server.repl_state);
ret = C_ERR;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing an assignment to err? This branch if hit will break the assumption on L3298.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it will break the log at L3298, we should be able to handle an empty err.

naglera and others added 12 commits October 10, 2024 10:38
Consolidate the cleanup of local variables to a single point within the method, ensuring proper resource management and preventing memory leaks or double-free issues.

Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
renaming and refactor

Signed-off-by: naglera <anagler123@gmail.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Signed-off-by: naglera <anagler123@gmail.com>
all worker functions should be stateless
err should be type sds
endoffset response can be empty

Signed-off-by: naglera <anagler123@gmail.com>
This reverts commit 3c0933497fea5d84860d24f2864a9c14ffa991bf.

Signed-off-by: naglera <anagler123@gmail.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Copy link
Member

@PingXie PingXie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am running out of comments ... if you could resolve the last few comments, let's merge this PR. It is great refactoring.

ret = dualChannelReplHandleEndOffsetResponse(conn, &err);
if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD;
break;
default: break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is unreachable, we should serverPanic

src/replication.c Show resolved Hide resolved
Comment on lines 2757 to 2758
sdsfree(err);
if (ret == C_ERR) goto error;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should give the error handling logic a chance to examine the err string. whether or not the error handling logic needs to consume the string is a separate concern.

Suggested change
sdsfree(err);
if (ret == C_ERR) goto error;
if (ret == C_ERR) goto error;
sdsfree(err);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right so we should change the order to

    if (ret == C_ERR) goto error;
    sdsfree(err);

Also maybe we better log the error string here as well just in case we didn't log it earlier (better log twice then miss it).

naglera and others added 3 commits October 14, 2024 16:48
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Copy link
Member

@PingXie PingXie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @naglera!

@PingXie PingXie added the run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP) label Oct 15, 2024
@PingXie PingXie merged commit b0f23df into valkey-io:unstable Oct 15, 2024
47 checks passed
eifrah-aws pushed a commit to eifrah-aws/valkey that referenced this pull request Oct 20, 2024
Consolidate the cleanup of local variables to a single point within the
method, ensuring proper resource management and p
reventing memory leaks or double-free issues.

Previoslly descused here:
- valkey-io#60 (comment)
- valkey-io#60 (comment)

---------

Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
tezc added a commit to redis/redis that referenced this pull request Jan 13, 2025
This PR is based on:

#12109
valkey-io/valkey#60

Closes: #11678

**Motivation**

During a full sync, when master is delivering RDB to the replica,
incoming write commands are kept in a replication buffer in order to be
sent to the replica once RDB delivery is completed. If RDB delivery
takes a long time, it might create memory pressure on master. Also, once
a replica connection accumulates replication data which is larger than
output buffer limits, master will kill replica connection. This may
cause a replication failure.

The main benefit of the rdb channel replication is streaming incoming
commands in parallel to the RDB delivery. This approach shifts
replication stream buffering to the replica and reduces load on master.
We do this by opening another connection for RDB delivery. The main
channel on replica will be receiving replication stream while rdb
channel is receiving the RDB.

This feature also helps to reduce master's main process CPU load. By
opening a dedicated connection for the RDB transfer, the bgsave process
has access to the new connection and it will stream RDB directly to the
replicas. Before this change, due to TLS connection restriction, the
bgsave process was writing RDB bytes to a pipe and the main process was
forwarding
it to the replica. This is no longer necessary, the main process can
avoid these expensive socket read/write syscalls. It also means RDB
delivery to replica will be faster as it avoids this step.

In summary, replication will be faster and master's performance during
full syncs will improve.


**Implementation steps**

1. When replica connects to the master, it sends 'rdb-channel-repl' as
part of capability exchange to let master to know replica supports rdb
channel.
2. When replica lacks sufficient data for PSYNC, master sends
+RDBCHANNELSYNC reply with replica's client id. As the next step, the
replica opens a new connection (rdb-channel) and configures it against
the master with the appropriate capabilities and requirements. It also
sends given client id back to master over rdbchannel, so that master can
associate these channels. (initial replica connection will be referred
as main-channel) Then, replica requests fullsync using the RDB channel.
3. Prior to forking, master attaches the replica's main channel to the
replication backlog to deliver replication stream starting at the
snapshot end offset.
4. The master main process sends replication stream via the main
channel, while the bgsave process sends the RDB directly to the replica
via the rdb-channel. Replica accumulates replication stream in a local
buffer, while the RDB is being loaded into the memory.
5. Once the replica completes loading the rdb, it drops the rdb channel
and streams the accumulated replication stream into the db. Sync is
completed.

**Some details**
- Currently, rdbchannel replication is supported only if
`repl-diskless-sync` is enabled on master. Otherwise, replication will
happen over a single connection as in before.
- On replica, there is a limit to replication stream buffering. Replica
uses a new config `replica-full-sync-buffer-limit` to limit number of
bytes to accumulate. If it is not set, replica inherits
`client-output-buffer-limit <replica>` hard limit config. If we reach
this limit, replica stops accumulating. This is not a failure scenario
though. Further accumulation will happen on master side. Depending on
the configured limits on master, master may kill the replica connection.

**API changes in INFO output:**

1. New replica state: `send_bulk_and_stream`. Indicates full sync is
still in progress for this replica. It is receiving replication stream
and rdb in parallel.
```
slave0:ip=127.0.0.1,port=5002,state=send_bulk_and_stream,offset=0,lag=0
```
Replica state changes in steps:
- First, replica sends psync and receives +RDBCHANNELSYNC
:`state=wait_bgsave`
- After replica connects with rdbchannel and delivery starts:
`state=send_bulk_and_stream`
 - After full sync: `state=online`

2. On replica side, replication stream buffering metrics:
- replica_full_sync_buffer_size: Currently accumulated replication
stream data in bytes.
- replica_full_sync_buffer_peak: Peak number of bytes that this instance
accumulated in the lifetime of the process.

```
replica_full_sync_buffer_size:20485             
replica_full_sync_buffer_peak:1048560
```

**API changes in CLIENT LIST**

In `client list` output, rdbchannel clients will have 'C' flag in
addition to 'S' replica flag:
```
id=11 addr=127.0.0.1:39108 laddr=127.0.0.1:5001 fd=14 name= age=5 idle=5 flags=SC db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1920 events=r cmd=psync user=default redir=-1 resp=2 lib-name= lib-ver= io-thread=0
```

**Config changes:**
- `replica-full-sync-buffer-limit`: Controls how much replication data
replica can accumulate during rdbchannel replication. If it is not set,
a value of 0 means replica will inherit `client-output-buffer-limit
<replica>` hard limit config to limit accumulated data.
- `repl-rdb-channel` config is added as a hidden config. This is mostly
for testing as we need to support both rdbchannel replication and the
older single connection replication (to keep compatibility with older
versions and rdbchannel replication will not be enabled if
repl-diskless-sync is not enabled). it affects both the master (not to
respond to rdb channel requests), and the replica (not to declare
capability)

**Internal API changes:**
Changes that were introduced to Redis replication:
- New replication capability is added to replconf command: `capa
rdb-channel-repl`. Indicates replica is capable of rdb channel
replication. Replica sends it when it connects to master along with
other capabilities.
- If replica needs fullsync, master replies `+RDBCHANNELSYNC
<client-id>` to the replica's PSYNC request.
- When replica opens rdbchannel connection, as part of replconf command,
it sends `rdb-channel 1` to let master know this is rdb channel. Also,
it sends `main-ch-client-id <client-id>` as part of replconf command so
master can associate channels.
  
**Testing:**
As rdbchannel replication is enabled by default, we run whole test suite
with it. Though, as we need to support both rdbchannel and single
connection replication, we'll be running some tests twice with
`repl-rdb-channel yes/no` config.

**Replica state diagram**
```
* * Replica state machine *
 *
 * Main channel state
 * ┌───────────────────┐
 * │RECEIVE_PING_REPLY │
 * └────────┬──────────┘
 *          │ +PONG
 * ┌────────▼──────────┐
 * │SEND_HANDSHAKE     │                     RDB channel state
 * └────────┬──────────┘            ┌───────────────────────────────┐
 *          │+OK                ┌───► RDB_CH_SEND_HANDSHAKE         │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_AUTH_REPLY │        │    REPLCONF main-ch-client-id <clientid>
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │ RDB_CH_RECEIVE_AUTH_REPLY     │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_PORT_REPLY │        │                  │ +OK
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │  RDB_CH_RECEIVE_REPLCONF_REPLY│
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_IP_REPLY   │        │                  │ +OK
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │ RDB_CH_RECEIVE_FULLRESYNC     │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_CAPA_REPLY │        │                  │+FULLRESYNC
 * └────────┬──────────┘        │                  │Rdb delivery
 *          │                   │   ┌──────────────▼────────────────┐
 * ┌────────▼──────────┐        │   │ RDB_CH_RDB_LOADING            │
 * │SEND_PSYNC         │        │   └──────────────┬────────────────┘
 * └─┬─────────────────┘        │                  │ Done loading
 *   │PSYNC (use cached-master) │                  │
 * ┌─▼─────────────────┐        │                  │
 * │RECEIVE_PSYNC_REPLY│        │    ┌────────────►│ Replica streams replication
 * └─┬─────────────────┘        │    │             │ buffer into memory
 *   │                          │    │             │
 *   │+RDBCHANNELSYNC client-id │    │             │
 *   ├──────┬───────────────────┘    │             │
 *   │      │ Main channel           │             │
 *   │      │ accumulates repl data  │             │
 *   │   ┌──▼────────────────┐       │     ┌───────▼───────────┐
 *   │   │ REPL_TRANSFER     ├───────┘     │    CONNECTED      │
 *   │   └───────────────────┘             └────▲───▲──────────┘
 *   │                                          │   │
 *   │                                          │   │
 *   │  +FULLRESYNC    ┌───────────────────┐    │   │
 *   ├────────────────► REPL_TRANSFER      ├────┘   │
 *   │                 └───────────────────┘        │
 *   │  +CONTINUE                                   │
 *   └──────────────────────────────────────────────┘
 */
 ```
 -----
 This PR also contains changes and ideas from: 
valkey-io/valkey#837
valkey-io/valkey#1173
valkey-io/valkey#804
valkey-io/valkey#945
valkey-io/valkey#989
---------

Co-authored-by: Yuan Wang <wangyuancode@163.com>
Co-authored-by: debing.sun <debing.sun@redis.com>
Co-authored-by: Moti Cohen <moticless@gmail.com>
Co-authored-by: naglera <anagler123@gmail.com>
Co-authored-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com>
Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com>
YaacovHazan pushed a commit to redis/redis that referenced this pull request Jan 14, 2025
This PR is based on:

#12109
valkey-io/valkey#60

Closes: #11678

**Motivation**

During a full sync, when master is delivering RDB to the replica,
incoming write commands are kept in a replication buffer in order to be
sent to the replica once RDB delivery is completed. If RDB delivery
takes a long time, it might create memory pressure on master. Also, once
a replica connection accumulates replication data which is larger than
output buffer limits, master will kill replica connection. This may
cause a replication failure.

The main benefit of the rdb channel replication is streaming incoming
commands in parallel to the RDB delivery. This approach shifts
replication stream buffering to the replica and reduces load on master.
We do this by opening another connection for RDB delivery. The main
channel on replica will be receiving replication stream while rdb
channel is receiving the RDB.

This feature also helps to reduce master's main process CPU load. By
opening a dedicated connection for the RDB transfer, the bgsave process
has access to the new connection and it will stream RDB directly to the
replicas. Before this change, due to TLS connection restriction, the
bgsave process was writing RDB bytes to a pipe and the main process was
forwarding
it to the replica. This is no longer necessary, the main process can
avoid these expensive socket read/write syscalls. It also means RDB
delivery to replica will be faster as it avoids this step.

In summary, replication will be faster and master's performance during
full syncs will improve.


**Implementation steps**

1. When replica connects to the master, it sends 'rdb-channel-repl' as
part of capability exchange to let master to know replica supports rdb
channel.
2. When replica lacks sufficient data for PSYNC, master sends
+RDBCHANNELSYNC reply with replica's client id. As the next step, the
replica opens a new connection (rdb-channel) and configures it against
the master with the appropriate capabilities and requirements. It also
sends given client id back to master over rdbchannel, so that master can
associate these channels. (initial replica connection will be referred
as main-channel) Then, replica requests fullsync using the RDB channel.
3. Prior to forking, master attaches the replica's main channel to the
replication backlog to deliver replication stream starting at the
snapshot end offset.
4. The master main process sends replication stream via the main
channel, while the bgsave process sends the RDB directly to the replica
via the rdb-channel. Replica accumulates replication stream in a local
buffer, while the RDB is being loaded into the memory.
5. Once the replica completes loading the rdb, it drops the rdb channel
and streams the accumulated replication stream into the db. Sync is
completed.

**Some details**
- Currently, rdbchannel replication is supported only if
`repl-diskless-sync` is enabled on master. Otherwise, replication will
happen over a single connection as in before.
- On replica, there is a limit to replication stream buffering. Replica
uses a new config `replica-full-sync-buffer-limit` to limit number of
bytes to accumulate. If it is not set, replica inherits
`client-output-buffer-limit <replica>` hard limit config. If we reach
this limit, replica stops accumulating. This is not a failure scenario
though. Further accumulation will happen on master side. Depending on
the configured limits on master, master may kill the replica connection.

**API changes in INFO output:**

1. New replica state: `send_bulk_and_stream`. Indicates full sync is
still in progress for this replica. It is receiving replication stream
and rdb in parallel.
```
slave0:ip=127.0.0.1,port=5002,state=send_bulk_and_stream,offset=0,lag=0
```
Replica state changes in steps:
- First, replica sends psync and receives +RDBCHANNELSYNC
:`state=wait_bgsave`
- After replica connects with rdbchannel and delivery starts:
`state=send_bulk_and_stream`
 - After full sync: `state=online`

2. On replica side, replication stream buffering metrics:
- replica_full_sync_buffer_size: Currently accumulated replication
stream data in bytes.
- replica_full_sync_buffer_peak: Peak number of bytes that this instance
accumulated in the lifetime of the process.

```
replica_full_sync_buffer_size:20485             
replica_full_sync_buffer_peak:1048560
```

**API changes in CLIENT LIST**

In `client list` output, rdbchannel clients will have 'C' flag in
addition to 'S' replica flag:
```
id=11 addr=127.0.0.1:39108 laddr=127.0.0.1:5001 fd=14 name= age=5 idle=5 flags=SC db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1920 events=r cmd=psync user=default redir=-1 resp=2 lib-name= lib-ver= io-thread=0
```

**Config changes:**
- `replica-full-sync-buffer-limit`: Controls how much replication data
replica can accumulate during rdbchannel replication. If it is not set,
a value of 0 means replica will inherit `client-output-buffer-limit
<replica>` hard limit config to limit accumulated data.
- `repl-rdb-channel` config is added as a hidden config. This is mostly
for testing as we need to support both rdbchannel replication and the
older single connection replication (to keep compatibility with older
versions and rdbchannel replication will not be enabled if
repl-diskless-sync is not enabled). it affects both the master (not to
respond to rdb channel requests), and the replica (not to declare
capability)

**Internal API changes:**
Changes that were introduced to Redis replication:
- New replication capability is added to replconf command: `capa
rdb-channel-repl`. Indicates replica is capable of rdb channel
replication. Replica sends it when it connects to master along with
other capabilities.
- If replica needs fullsync, master replies `+RDBCHANNELSYNC
<client-id>` to the replica's PSYNC request.
- When replica opens rdbchannel connection, as part of replconf command,
it sends `rdb-channel 1` to let master know this is rdb channel. Also,
it sends `main-ch-client-id <client-id>` as part of replconf command so
master can associate channels.
  
**Testing:**
As rdbchannel replication is enabled by default, we run whole test suite
with it. Though, as we need to support both rdbchannel and single
connection replication, we'll be running some tests twice with
`repl-rdb-channel yes/no` config.

**Replica state diagram**
```
* * Replica state machine *
 *
 * Main channel state
 * ┌───────────────────┐
 * │RECEIVE_PING_REPLY │
 * └────────┬──────────┘
 *          │ +PONG
 * ┌────────▼──────────┐
 * │SEND_HANDSHAKE     │                     RDB channel state
 * └────────┬──────────┘            ┌───────────────────────────────┐
 *          │+OK                ┌───► RDB_CH_SEND_HANDSHAKE         │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_AUTH_REPLY │        │    REPLCONF main-ch-client-id <clientid>
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │ RDB_CH_RECEIVE_AUTH_REPLY     │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_PORT_REPLY │        │                  │ +OK
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │  RDB_CH_RECEIVE_REPLCONF_REPLY│
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_IP_REPLY   │        │                  │ +OK
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │ RDB_CH_RECEIVE_FULLRESYNC     │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_CAPA_REPLY │        │                  │+FULLRESYNC
 * └────────┬──────────┘        │                  │Rdb delivery
 *          │                   │   ┌──────────────▼────────────────┐
 * ┌────────▼──────────┐        │   │ RDB_CH_RDB_LOADING            │
 * │SEND_PSYNC         │        │   └──────────────┬────────────────┘
 * └─┬─────────────────┘        │                  │ Done loading
 *   │PSYNC (use cached-master) │                  │
 * ┌─▼─────────────────┐        │                  │
 * │RECEIVE_PSYNC_REPLY│        │    ┌────────────►│ Replica streams replication
 * └─┬─────────────────┘        │    │             │ buffer into memory
 *   │                          │    │             │
 *   │+RDBCHANNELSYNC client-id │    │             │
 *   ├──────┬───────────────────┘    │             │
 *   │      │ Main channel           │             │
 *   │      │ accumulates repl data  │             │
 *   │   ┌──▼────────────────┐       │     ┌───────▼───────────┐
 *   │   │ REPL_TRANSFER     ├───────┘     │    CONNECTED      │
 *   │   └───────────────────┘             └────▲───▲──────────┘
 *   │                                          │   │
 *   │                                          │   │
 *   │  +FULLRESYNC    ┌───────────────────┐    │   │
 *   ├────────────────► REPL_TRANSFER      ├────┘   │
 *   │                 └───────────────────┘        │
 *   │  +CONTINUE                                   │
 *   └──────────────────────────────────────────────┘
 */
 ```
 -----
 This PR also contains changes and ideas from: 
valkey-io/valkey#837
valkey-io/valkey#1173
valkey-io/valkey#804
valkey-io/valkey#945
valkey-io/valkey#989
---------

Co-authored-by: Yuan Wang <wangyuancode@163.com>
Co-authored-by: debing.sun <debing.sun@redis.com>
Co-authored-by: Moti Cohen <moticless@gmail.com>
Co-authored-by: naglera <anagler123@gmail.com>
Co-authored-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com>
Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP)
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

4 participants