Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scoped RDB loading context and immediate abort flag #1173

Merged
merged 18 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add ASAP abort flag to provisional primary for safer replication disc…
…onnection handling

Introduces a dedicated flag in provisional primary struct to signal immediate
abort, preventing potential use-after-free scenarios during replication
disconnection in dual-channel load. This ensures proper termination of
rdbLoadRioWithLoadingCtx when replication is cancelled due to connection loss
on main connection.

Fixes #1152

Signed-off-by: naglera <anagler123@gmail.com>
  • Loading branch information
naglera committed Oct 29, 2024
commit dec88f05ce05956254770d29958cca9d7af52407
7 changes: 6 additions & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2921,7 +2921,7 @@ void stopSaving(int success) {

/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
int rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.rdb_checksum) rioGenericUpdateChecksum(r, buf, len);
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len) / server.loading_process_events_interval_bytes >
Expand All @@ -2934,6 +2934,11 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
server.stat_net_repl_input_bytes += len;
}
if (server.repl_provisional_primary.close_asap == 1) {
serverLog(LL_WARNING, "Primary main connection dropped during RDB load callback");
return -1;
}
return 0;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
}

/* Save the given functions_ctx to the rdb.
Expand Down
2 changes: 2 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) {
server.repl_provisional_primary.reploff = reploffset;
server.repl_provisional_primary.read_reploff = reploffset;
server.repl_provisional_primary.dbid = dbid;
server.repl_provisional_primary.close_asap = 0;

/* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the
* main connection accordingly.*/
Expand Down Expand Up @@ -2833,6 +2834,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
server.repl_provisional_primary.close_asap = 1;
cancelReplicationHandshake(1);
return C_ERR;
}
Expand Down
3 changes: 2 additions & 1 deletion src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,9 @@ void rioFreeFd(rio *r) {

/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum, buf, len);
return 1;
}

/* Set the file-based rio object to auto-fsync every 'bytes' file written.
Expand Down
10 changes: 6 additions & 4 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ struct _rio {
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
* computation.
madolson marked this conversation as resolved.
Show resolved Hide resolved
* The method should return -1 to indicate that the rio operation should be
* terminated, or a non-negative value to continue processing. */
int (*update_cksum)(struct _rio *, const void *buf, size_t len);

/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;
Expand Down Expand Up @@ -140,7 +142,7 @@ static inline size_t rioRead(rio *r, void *buf, size_t len) {
r->flags |= RIO_FLAG_READ_ERROR;
return 0;
}
if (r->update_cksum) r->update_cksum(r, buf, bytes_to_read);
if (r->update_cksum && r->update_cksum(r, buf, bytes_to_read) < 0) return 0;
buf = (char *)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
Expand Down Expand Up @@ -188,7 +190,7 @@ size_t rioWriteBulkDouble(rio *r, double d);
struct serverObject;
int rioWriteBulkObject(rio *r, struct serverObject *obj);

void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioSetAutoSync(rio *r, off_t bytes);
void rioSetReclaimCache(rio *r, int enabled);
uint8_t rioCheckType(rio *r);
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,7 @@ struct valkeyServer {
long long reploff;
long long read_reploff;
int dbid;
uint64_t close_asap : 1;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
Expand Down
2 changes: 1 addition & 1 deletion src/valkey-check-rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <sys/stat.h>

void createSharedObjects(void);
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
int rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
int rdbCheckMode = 0;

struct {
Expand Down