Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scoped RDB loading context and immediate abort flag #1173

Merged
merged 18 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Add ASAP abort flag to provisional primary for safer replicat…
…ion disconnection handling"

This reverts commit b873d41.

Signed-off-by: naglera <anagler123@gmail.com>
  • Loading branch information
naglera committed Oct 29, 2024
commit f89b7161d2e0490b9d384ef328003e8bc0f10842
3 changes: 1 addition & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2921,7 +2921,7 @@ void stopSaving(int success) {

/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
int rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.rdb_checksum) rioGenericUpdateChecksum(r, buf, len);
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len) / server.loading_process_events_interval_bytes >
Expand All @@ -2938,7 +2938,6 @@ int rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
server.stat_net_repl_input_bytes += len;
}
return 0;
}

/* Save the given functions_ctx to the rdb.
Expand Down
2 changes: 0 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2701,7 +2701,6 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) {
server.repl_provisional_primary.reploff = reploffset;
server.repl_provisional_primary.read_reploff = reploffset;
server.repl_provisional_primary.dbid = dbid;
server.repl_provisional_primary.close_asap = 0;

/* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the
* main connection accordingly.*/
Expand Down Expand Up @@ -2834,7 +2833,6 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
server.repl_provisional_primary.close_asap = 1;
cancelReplicationHandshake(1);
return C_ERR;
}
Expand Down
3 changes: 1 addition & 2 deletions src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,8 @@ void rioFreeFd(rio *r) {

/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum, buf, len);
return 1;
}

/* Set the file-based rio object to auto-fsync every 'bytes' file written.
Expand Down
10 changes: 4 additions & 6 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ struct _rio {
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation.
* The method should return -1 to indicate that the rio operation should be
* terminated, or a non-negative value to continue processing. */
int (*update_cksum)(struct _rio *, const void *buf, size_t len);
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);

/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;
Expand Down Expand Up @@ -142,7 +140,7 @@ static inline size_t rioRead(rio *r, void *buf, size_t len) {
r->flags |= RIO_FLAG_READ_ERROR;
return 0;
}
if (r->update_cksum && r->update_cksum(r, buf, bytes_to_read) < 0) return 0;
if (r->update_cksum) r->update_cksum(r, buf, bytes_to_read);
buf = (char *)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
Expand Down Expand Up @@ -190,7 +188,7 @@ size_t rioWriteBulkDouble(rio *r, double d);
struct serverObject;
int rioWriteBulkObject(rio *r, struct serverObject *obj);

int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioSetAutoSync(rio *r, off_t bytes);
void rioSetReclaimCache(rio *r, int enabled);
uint8_t rioCheckType(rio *r);
Expand Down
1 change: 0 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,6 @@ struct valkeyServer {
long long reploff;
long long read_reploff;
int dbid;
uint64_t close_asap : 1;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
Expand Down
2 changes: 1 addition & 1 deletion src/valkey-check-rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <sys/stat.h>

void createSharedObjects(void);
int rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
int rdbCheckMode = 0;

struct {
Expand Down