-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor return and goto statements #945
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #945 +/- ##
============================================
- Coverage 70.69% 70.64% -0.05%
============================================
Files 114 114
Lines 61693 61735 +42
============================================
+ Hits 43611 43613 +2
- Misses 18082 18122 +40
|
src/replication.c
Outdated
@@ -3541,7 +3531,7 @@ void syncWithPrimary(connection *conn) { | |||
clearFailoverState(); | |||
} else { | |||
abortFailover("Failover target rejected psync request"); | |||
return; | |||
goto cleanup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no cleanup needed in line 3524?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider refactoring this code further by breaking out the different if
paths so each function, including fullSyncWithPrimary
, is dealing with a single concern. What do you think about a skeleton function like this?
static void fullSyncWithPrimary(connection *conn) {
char *err = NULL;
serverAssert(conn == server.repl_rdb_transfer_s);
if (server.repl_state == REPL_STATE_NONE) goto error;
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", connGetLastError(conn));
goto error;
}
switch (server.repl_rdb_channel_state) {
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE:
handleHandshake(conn, &err); /* TO BE FILLED OUT */
break;
case REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY:
if (server.primary_auth) {
handleAuthReply(conn, &err); /* TO BE FILLED OUT */
} else {
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
}
break;
case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY:
handleReplConfReply(conn, &err); /* TO BE FILLED OUT */
break;
case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF:
handleEndOffsetResponse(conn, &err); /* TO BE FILLED OUT */
break;
default:
break;
}
if (err) goto error;
return;
error:
if (err) sdsfree(err);
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_rdb_transfer_s) {
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
}
if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
replicationAbortDualChannelSyncTransfer();
}
Started with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. My feedback is all on fit-n-finish.
src/replication.c
Outdated
int ret; | ||
|
||
switch (server.repl_state) { | ||
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this naming scheme but can we shorten it a bit? It is a bit mouthful :)
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break; | |
case REPL_STATE_SEND_HANDSHAKE: ret = dualChanReplMainConnSendHandshake(conn, &err); break; |
src/replication.c
Outdated
/* Replication: Replica side. | ||
* This connection handler fires after rdb-connection was initialized. We use it | ||
* to adjust the replica main for loading incremental changes into the local buffer. */ | ||
void setupMainConnForPsync(connection *conn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about prefixing this function with dualChan
as well?
void setupMainConnForPsync(connection *conn) { | |
void dualChanSetupMainConnForPsync(connection *conn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I just don't like the abbreviation Channel => Chan, I don't think it is very clean.
src/replication.c
Outdated
switch (server.repl_state) { | ||
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break; | ||
case REPL_STATE_RECEIVE_CAPA_REPLY: | ||
ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err); | |
ret = dualChanReplMainConnRecvCapaReply(conn, &err); |
src/replication.c
Outdated
sdsfree(err); | ||
err = NULL; | ||
/* fall through */ | ||
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break; | |
case REPL_STATE_SEND_PSYNC: ret = dualChanReplMainConnSendPsync(conn, &err); break; |
src/replication.c
Outdated
err = NULL; | ||
/* fall through */ | ||
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break; | ||
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break; | |
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChanReplMainConnRecvPsyncReply(conn, &err); break; |
src/replication.c
Outdated
@@ -55,6 +55,7 @@ int cancelReplicationHandshake(int reconnect); | |||
void replicationSteadyStateInit(void); | |||
void setupMainConnForPsync(connection *conn); | |||
void dualChannelSyncHandleRdbLoadCompletion(void); | |||
static void fullSyncWithPrimary(connection *conn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is used in the dual-chan replication path only so I would think it makes more sense to prefix it too
static void fullSyncWithPrimary(connection *conn); | |
static void dualChanFullSyncWithPrimary(connection *conn); |
src/replication.c
Outdated
} | ||
|
||
int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) { | ||
UNUSED(err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this function can't fail, I would suggest we null out err
so we stick to a more intuitive contract, which is null err
means success. Let's not leave the err
state undefined (or decided by the caller).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the idea of relying on the error string. From my observation, it is not maintained for every error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is another inconsistency in the existing code. Functions like sendCommand
return a non-NULL error string to indicate failures already. I am fine with treating an error string returned via an outgoing parameter as "informational" and for "logging purpose only" hence optional. This should help ensure consistency.
+1 on refactoring |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @naglera! I think we are very close.
src/replication.c
Outdated
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); | ||
return C_ERR; | ||
} | ||
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see us pulling all state changes into the driver function, i.e., dualChannelFullSyncWithPrimary
in this case. This way it is very easy to see all the state transitions together. I will suggest some changes in dualChannelFullSyncWithPrimary
next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok nice
src/replication.c
Outdated
int ret; | ||
|
||
switch (server.repl_state) { | ||
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplMainConnSendHandshake(conn, &err); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I meant by consolidating state manipulations in the "driver" function and then all the "worker" functions (such as dualChannelReplMainConnRecvCapaReply
) become "stateless".
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplMainConnSendHandshake(conn, &err); break; | |
case REPL_STATE_SEND_HANDSHAKE: | |
ret = dualChannelReplMainConnSendHandshake(conn, &err); | |
if (ret != C_ERR) server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; | |
/* else server.repl_state = XXX; // if it makes sense */ | |
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
src/replication.c
Outdated
@@ -3415,7 +3447,7 @@ void syncWithPrimary(connection *conn) { | |||
if (err) goto write_error; | |||
|
|||
server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; | |||
return; | |||
goto cleanup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to move these changes to the PR that refactors syncWithPrimary
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I will revert it
src/replication.c
Outdated
} | ||
|
||
int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) { | ||
UNUSED(err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is another inconsistency in the existing code. Functions like sendCommand
return a non-NULL error string to indicate failures already. I am fine with treating an error string returned via an outgoing parameter as "informational" and for "logging purpose only" hence optional. This should help ensure consistency.
src/replication.c
Outdated
return C_OK; | ||
} | ||
|
||
#define C_RETRY -2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this to where C_OK/C_ERR is defined?
src/replication.c
Outdated
|
||
error: | ||
sdsfree(err); | ||
connClose(conn); | ||
server.repl_transfer_s = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a bug to me. It seems the two connection objects are getting mixed up. We should zero out the RDB channel connection here, i.e., server.repl_rdb_transfer_s = NULL
. Then on L2726/L2747, we should be testing the normal channel connection instead, i.e., if (server.repl_transfer_s)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we are trying to close repl_transfer_s
twice 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix #1088 in a separate PR first? I think we will need to backport it to Valkey 8.0
src/replication.c
Outdated
} | ||
switch (server.repl_rdb_channel_state) { | ||
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE: | ||
ret = dualChannelReplHandleHandshake(conn, &err); | ||
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only update the state upon successful returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't matter because in case of C_ERR, we will reset the state, but I agree that it makes more sense for the reader. I'll fix it.
003291f
to
5dfb259
Compare
src/replication.c
Outdated
|
||
if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) { | ||
char conninfo[CONN_INFO_LEN]; | ||
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", *err, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems incorrect. *err
is not assigned at this point. we should revert back to strerror(errno)
I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right err was not assigned in this case. will fix
sdsfree(err); | ||
connClose(conn); | ||
server.repl_transfer_s = NULL; | ||
if (server.repl_transfer_s) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding sdsfree(err)
after the error:
label as well so that if we ever goto error
in the future with a non-NULL err
, we won't leak memory accidentally.
/* AUTH with the primary if required. */ | ||
switch (server.repl_rdb_channel_state) { | ||
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE: | ||
ret = dualChannelReplHandleHandshake(conn, &err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just realize that we never consume err
in dualChannelFullSyncWithPrimary
, whose only job when it comes to err
is to free the memory. Was a logging statement missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct, we used to log it in case of an error, but after the refactor we're already logging it inside the helper functions, so there's no need to log it inside dualChannelFullSyncWithPrimary
anymore. That being said, I believe we should retain the error deallocation logic within dualChannelFullSyncWithPrimary
for the sake of maintaining simplicity in our code structure.
src/replication.c
Outdated
break; | ||
default: | ||
serverLog(LL_WARNING, "Unexpected replication state: %d", server.repl_state); | ||
ret = C_ERR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing an assignment to err
? This branch if hit will break the assumption on L3298.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it will break the log at L3298, we should be able to handle an empty err
.
Consolidate the cleanup of local variables to a single point within the method, ensuring proper resource management and preventing memory leaks or double-free issues. Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
renaming and refactor Signed-off-by: naglera <anagler123@gmail.com>
Co-authored-by: Ping Xie <pingxie@outlook.com> Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Signed-off-by: naglera <anagler123@gmail.com>
all worker functions should be stateless err should be type sds endoffset response can be empty Signed-off-by: naglera <anagler123@gmail.com>
This reverts commit 3c0933497fea5d84860d24f2864a9c14ffa991bf. Signed-off-by: naglera <anagler123@gmail.com>
Co-authored-by: Ping Xie <pingxie@outlook.com> Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I am running out of comments ... if you could resolve the last few comments, let's merge this PR. It is great refactoring.
src/replication.c
Outdated
ret = dualChannelReplHandleEndOffsetResponse(conn, &err); | ||
if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD; | ||
break; | ||
default: break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is unreachable, we should serverPanic
src/replication.c
Outdated
sdsfree(err); | ||
if (ret == C_ERR) goto error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should give the error handling logic a chance to examine the err string. whether or not the error handling logic needs to consume the string is a separate concern.
sdsfree(err); | |
if (ret == C_ERR) goto error; | |
if (ret == C_ERR) goto error; | |
sdsfree(err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right so we should change the order to
if (ret == C_ERR) goto error;
sdsfree(err);
Also maybe we better log the error string here as well just in case we didn't log it earlier (better log twice then miss it).
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Ping Xie <pingxie@outlook.com> Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: naglera <anagler123@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @naglera!
Consolidate the cleanup of local variables to a single point within the method, ensuring proper resource management and p reventing memory leaks or double-free issues. Previoslly descused here: - valkey-io#60 (comment) - valkey-io#60 (comment) --------- Signed-off-by: naglera <anagler123@gmail.com> Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Co-authored-by: Ping Xie <pingxie@outlook.com>
This PR is based on: #12109 valkey-io/valkey#60 Closes: #11678 **Motivation** During a full sync, when master is delivering RDB to the replica, incoming write commands are kept in a replication buffer in order to be sent to the replica once RDB delivery is completed. If RDB delivery takes a long time, it might create memory pressure on master. Also, once a replica connection accumulates replication data which is larger than output buffer limits, master will kill replica connection. This may cause a replication failure. The main benefit of the rdb channel replication is streaming incoming commands in parallel to the RDB delivery. This approach shifts replication stream buffering to the replica and reduces load on master. We do this by opening another connection for RDB delivery. The main channel on replica will be receiving replication stream while rdb channel is receiving the RDB. This feature also helps to reduce master's main process CPU load. By opening a dedicated connection for the RDB transfer, the bgsave process has access to the new connection and it will stream RDB directly to the replicas. Before this change, due to TLS connection restriction, the bgsave process was writing RDB bytes to a pipe and the main process was forwarding it to the replica. This is no longer necessary, the main process can avoid these expensive socket read/write syscalls. It also means RDB delivery to replica will be faster as it avoids this step. In summary, replication will be faster and master's performance during full syncs will improve. **Implementation steps** 1. When replica connects to the master, it sends 'rdb-channel-repl' as part of capability exchange to let master to know replica supports rdb channel. 2. When replica lacks sufficient data for PSYNC, master sends +RDBCHANNELSYNC reply with replica's client id. As the next step, the replica opens a new connection (rdb-channel) and configures it against the master with the appropriate capabilities and requirements. It also sends given client id back to master over rdbchannel, so that master can associate these channels. (initial replica connection will be referred as main-channel) Then, replica requests fullsync using the RDB channel. 3. Prior to forking, master attaches the replica's main channel to the replication backlog to deliver replication stream starting at the snapshot end offset. 4. The master main process sends replication stream via the main channel, while the bgsave process sends the RDB directly to the replica via the rdb-channel. Replica accumulates replication stream in a local buffer, while the RDB is being loaded into the memory. 5. Once the replica completes loading the rdb, it drops the rdb channel and streams the accumulated replication stream into the db. Sync is completed. **Some details** - Currently, rdbchannel replication is supported only if `repl-diskless-sync` is enabled on master. Otherwise, replication will happen over a single connection as in before. - On replica, there is a limit to replication stream buffering. Replica uses a new config `replica-full-sync-buffer-limit` to limit number of bytes to accumulate. If it is not set, replica inherits `client-output-buffer-limit <replica>` hard limit config. If we reach this limit, replica stops accumulating. This is not a failure scenario though. Further accumulation will happen on master side. Depending on the configured limits on master, master may kill the replica connection. **API changes in INFO output:** 1. New replica state: `send_bulk_and_stream`. Indicates full sync is still in progress for this replica. It is receiving replication stream and rdb in parallel. ``` slave0:ip=127.0.0.1,port=5002,state=send_bulk_and_stream,offset=0,lag=0 ``` Replica state changes in steps: - First, replica sends psync and receives +RDBCHANNELSYNC :`state=wait_bgsave` - After replica connects with rdbchannel and delivery starts: `state=send_bulk_and_stream` - After full sync: `state=online` 2. On replica side, replication stream buffering metrics: - replica_full_sync_buffer_size: Currently accumulated replication stream data in bytes. - replica_full_sync_buffer_peak: Peak number of bytes that this instance accumulated in the lifetime of the process. ``` replica_full_sync_buffer_size:20485 replica_full_sync_buffer_peak:1048560 ``` **API changes in CLIENT LIST** In `client list` output, rdbchannel clients will have 'C' flag in addition to 'S' replica flag: ``` id=11 addr=127.0.0.1:39108 laddr=127.0.0.1:5001 fd=14 name= age=5 idle=5 flags=SC db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1920 events=r cmd=psync user=default redir=-1 resp=2 lib-name= lib-ver= io-thread=0 ``` **Config changes:** - `replica-full-sync-buffer-limit`: Controls how much replication data replica can accumulate during rdbchannel replication. If it is not set, a value of 0 means replica will inherit `client-output-buffer-limit <replica>` hard limit config to limit accumulated data. - `repl-rdb-channel` config is added as a hidden config. This is mostly for testing as we need to support both rdbchannel replication and the older single connection replication (to keep compatibility with older versions and rdbchannel replication will not be enabled if repl-diskless-sync is not enabled). it affects both the master (not to respond to rdb channel requests), and the replica (not to declare capability) **Internal API changes:** Changes that were introduced to Redis replication: - New replication capability is added to replconf command: `capa rdb-channel-repl`. Indicates replica is capable of rdb channel replication. Replica sends it when it connects to master along with other capabilities. - If replica needs fullsync, master replies `+RDBCHANNELSYNC <client-id>` to the replica's PSYNC request. - When replica opens rdbchannel connection, as part of replconf command, it sends `rdb-channel 1` to let master know this is rdb channel. Also, it sends `main-ch-client-id <client-id>` as part of replconf command so master can associate channels. **Testing:** As rdbchannel replication is enabled by default, we run whole test suite with it. Though, as we need to support both rdbchannel and single connection replication, we'll be running some tests twice with `repl-rdb-channel yes/no` config. **Replica state diagram** ``` * * Replica state machine * * * Main channel state * ┌───────────────────┐ * │RECEIVE_PING_REPLY │ * └────────┬──────────┘ * │ +PONG * ┌────────▼──────────┐ * │SEND_HANDSHAKE │ RDB channel state * └────────┬──────────┘ ┌───────────────────────────────┐ * │+OK ┌───► RDB_CH_SEND_HANDSHAKE │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_AUTH_REPLY │ │ REPLCONF main-ch-client-id <clientid> * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_AUTH_REPLY │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_PORT_REPLY │ │ │ +OK * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_REPLCONF_REPLY│ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_IP_REPLY │ │ │ +OK * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_FULLRESYNC │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_CAPA_REPLY │ │ │+FULLRESYNC * └────────┬──────────┘ │ │Rdb delivery * │ │ ┌──────────────▼────────────────┐ * ┌────────▼──────────┐ │ │ RDB_CH_RDB_LOADING │ * │SEND_PSYNC │ │ └──────────────┬────────────────┘ * └─┬─────────────────┘ │ │ Done loading * │PSYNC (use cached-master) │ │ * ┌─▼─────────────────┐ │ │ * │RECEIVE_PSYNC_REPLY│ │ ┌────────────►│ Replica streams replication * └─┬─────────────────┘ │ │ │ buffer into memory * │ │ │ │ * │+RDBCHANNELSYNC client-id │ │ │ * ├──────┬───────────────────┘ │ │ * │ │ Main channel │ │ * │ │ accumulates repl data │ │ * │ ┌──▼────────────────┐ │ ┌───────▼───────────┐ * │ │ REPL_TRANSFER ├───────┘ │ CONNECTED │ * │ └───────────────────┘ └────▲───▲──────────┘ * │ │ │ * │ │ │ * │ +FULLRESYNC ┌───────────────────┐ │ │ * ├────────────────► REPL_TRANSFER ├────┘ │ * │ └───────────────────┘ │ * │ +CONTINUE │ * └──────────────────────────────────────────────┘ */ ``` ----- This PR also contains changes and ideas from: valkey-io/valkey#837 valkey-io/valkey#1173 valkey-io/valkey#804 valkey-io/valkey#945 valkey-io/valkey#989 --------- Co-authored-by: Yuan Wang <wangyuancode@163.com> Co-authored-by: debing.sun <debing.sun@redis.com> Co-authored-by: Moti Cohen <moticless@gmail.com> Co-authored-by: naglera <anagler123@gmail.com> Co-authored-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Binbin <binloveplay1314@qq.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech> Co-authored-by: Ping Xie <pingxie@outlook.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com>
This PR is based on: #12109 valkey-io/valkey#60 Closes: #11678 **Motivation** During a full sync, when master is delivering RDB to the replica, incoming write commands are kept in a replication buffer in order to be sent to the replica once RDB delivery is completed. If RDB delivery takes a long time, it might create memory pressure on master. Also, once a replica connection accumulates replication data which is larger than output buffer limits, master will kill replica connection. This may cause a replication failure. The main benefit of the rdb channel replication is streaming incoming commands in parallel to the RDB delivery. This approach shifts replication stream buffering to the replica and reduces load on master. We do this by opening another connection for RDB delivery. The main channel on replica will be receiving replication stream while rdb channel is receiving the RDB. This feature also helps to reduce master's main process CPU load. By opening a dedicated connection for the RDB transfer, the bgsave process has access to the new connection and it will stream RDB directly to the replicas. Before this change, due to TLS connection restriction, the bgsave process was writing RDB bytes to a pipe and the main process was forwarding it to the replica. This is no longer necessary, the main process can avoid these expensive socket read/write syscalls. It also means RDB delivery to replica will be faster as it avoids this step. In summary, replication will be faster and master's performance during full syncs will improve. **Implementation steps** 1. When replica connects to the master, it sends 'rdb-channel-repl' as part of capability exchange to let master to know replica supports rdb channel. 2. When replica lacks sufficient data for PSYNC, master sends +RDBCHANNELSYNC reply with replica's client id. As the next step, the replica opens a new connection (rdb-channel) and configures it against the master with the appropriate capabilities and requirements. It also sends given client id back to master over rdbchannel, so that master can associate these channels. (initial replica connection will be referred as main-channel) Then, replica requests fullsync using the RDB channel. 3. Prior to forking, master attaches the replica's main channel to the replication backlog to deliver replication stream starting at the snapshot end offset. 4. The master main process sends replication stream via the main channel, while the bgsave process sends the RDB directly to the replica via the rdb-channel. Replica accumulates replication stream in a local buffer, while the RDB is being loaded into the memory. 5. Once the replica completes loading the rdb, it drops the rdb channel and streams the accumulated replication stream into the db. Sync is completed. **Some details** - Currently, rdbchannel replication is supported only if `repl-diskless-sync` is enabled on master. Otherwise, replication will happen over a single connection as in before. - On replica, there is a limit to replication stream buffering. Replica uses a new config `replica-full-sync-buffer-limit` to limit number of bytes to accumulate. If it is not set, replica inherits `client-output-buffer-limit <replica>` hard limit config. If we reach this limit, replica stops accumulating. This is not a failure scenario though. Further accumulation will happen on master side. Depending on the configured limits on master, master may kill the replica connection. **API changes in INFO output:** 1. New replica state: `send_bulk_and_stream`. Indicates full sync is still in progress for this replica. It is receiving replication stream and rdb in parallel. ``` slave0:ip=127.0.0.1,port=5002,state=send_bulk_and_stream,offset=0,lag=0 ``` Replica state changes in steps: - First, replica sends psync and receives +RDBCHANNELSYNC :`state=wait_bgsave` - After replica connects with rdbchannel and delivery starts: `state=send_bulk_and_stream` - After full sync: `state=online` 2. On replica side, replication stream buffering metrics: - replica_full_sync_buffer_size: Currently accumulated replication stream data in bytes. - replica_full_sync_buffer_peak: Peak number of bytes that this instance accumulated in the lifetime of the process. ``` replica_full_sync_buffer_size:20485 replica_full_sync_buffer_peak:1048560 ``` **API changes in CLIENT LIST** In `client list` output, rdbchannel clients will have 'C' flag in addition to 'S' replica flag: ``` id=11 addr=127.0.0.1:39108 laddr=127.0.0.1:5001 fd=14 name= age=5 idle=5 flags=SC db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1920 events=r cmd=psync user=default redir=-1 resp=2 lib-name= lib-ver= io-thread=0 ``` **Config changes:** - `replica-full-sync-buffer-limit`: Controls how much replication data replica can accumulate during rdbchannel replication. If it is not set, a value of 0 means replica will inherit `client-output-buffer-limit <replica>` hard limit config to limit accumulated data. - `repl-rdb-channel` config is added as a hidden config. This is mostly for testing as we need to support both rdbchannel replication and the older single connection replication (to keep compatibility with older versions and rdbchannel replication will not be enabled if repl-diskless-sync is not enabled). it affects both the master (not to respond to rdb channel requests), and the replica (not to declare capability) **Internal API changes:** Changes that were introduced to Redis replication: - New replication capability is added to replconf command: `capa rdb-channel-repl`. Indicates replica is capable of rdb channel replication. Replica sends it when it connects to master along with other capabilities. - If replica needs fullsync, master replies `+RDBCHANNELSYNC <client-id>` to the replica's PSYNC request. - When replica opens rdbchannel connection, as part of replconf command, it sends `rdb-channel 1` to let master know this is rdb channel. Also, it sends `main-ch-client-id <client-id>` as part of replconf command so master can associate channels. **Testing:** As rdbchannel replication is enabled by default, we run whole test suite with it. Though, as we need to support both rdbchannel and single connection replication, we'll be running some tests twice with `repl-rdb-channel yes/no` config. **Replica state diagram** ``` * * Replica state machine * * * Main channel state * ┌───────────────────┐ * │RECEIVE_PING_REPLY │ * └────────┬──────────┘ * │ +PONG * ┌────────▼──────────┐ * │SEND_HANDSHAKE │ RDB channel state * └────────┬──────────┘ ┌───────────────────────────────┐ * │+OK ┌───► RDB_CH_SEND_HANDSHAKE │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_AUTH_REPLY │ │ REPLCONF main-ch-client-id <clientid> * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_AUTH_REPLY │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_PORT_REPLY │ │ │ +OK * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_REPLCONF_REPLY│ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_IP_REPLY │ │ │ +OK * └────────┬──────────┘ │ ┌──────────────▼────────────────┐ * │+OK │ │ RDB_CH_RECEIVE_FULLRESYNC │ * ┌────────▼──────────┐ │ └──────────────┬────────────────┘ * │RECEIVE_CAPA_REPLY │ │ │+FULLRESYNC * └────────┬──────────┘ │ │Rdb delivery * │ │ ┌──────────────▼────────────────┐ * ┌────────▼──────────┐ │ │ RDB_CH_RDB_LOADING │ * │SEND_PSYNC │ │ └──────────────┬────────────────┘ * └─┬─────────────────┘ │ │ Done loading * │PSYNC (use cached-master) │ │ * ┌─▼─────────────────┐ │ │ * │RECEIVE_PSYNC_REPLY│ │ ┌────────────►│ Replica streams replication * └─┬─────────────────┘ │ │ │ buffer into memory * │ │ │ │ * │+RDBCHANNELSYNC client-id │ │ │ * ├──────┬───────────────────┘ │ │ * │ │ Main channel │ │ * │ │ accumulates repl data │ │ * │ ┌──▼────────────────┐ │ ┌───────▼───────────┐ * │ │ REPL_TRANSFER ├───────┘ │ CONNECTED │ * │ └───────────────────┘ └────▲───▲──────────┘ * │ │ │ * │ │ │ * │ +FULLRESYNC ┌───────────────────┐ │ │ * ├────────────────► REPL_TRANSFER ├────┘ │ * │ └───────────────────┘ │ * │ +CONTINUE │ * └──────────────────────────────────────────────┘ */ ``` ----- This PR also contains changes and ideas from: valkey-io/valkey#837 valkey-io/valkey#1173 valkey-io/valkey#804 valkey-io/valkey#945 valkey-io/valkey#989 --------- Co-authored-by: Yuan Wang <wangyuancode@163.com> Co-authored-by: debing.sun <debing.sun@redis.com> Co-authored-by: Moti Cohen <moticless@gmail.com> Co-authored-by: naglera <anagler123@gmail.com> Co-authored-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Binbin <binloveplay1314@qq.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech> Co-authored-by: Ping Xie <pingxie@outlook.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com>
Consolidate the cleanup of local variables to a single point within the method, ensuring proper resource management and p
reventing memory leaks or double-free issues.
Previoslly descused here: