Skip to content

Commit 10e36f4

Browse files
MMeenttristan957
authored andcommitted
[PG15] Feature/replicas (#279)
* Recovery requirements: Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer. * Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts. This fixes some test failures that showed up after updating Neon code to do more precise handling of replica's get_page_at_lsn's request_lsn lsns. --------- Co-authored-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
1 parent 1d98a6c commit 10e36f4

File tree

4 files changed

+81
-5
lines changed

4 files changed

+81
-5
lines changed

src/backend/access/transam/xlog.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5305,6 +5305,14 @@ StartupXLOG(void)
53055305
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
53065306
doPageWrites = lastFullPageWrites;
53075307

5308+
/*
5309+
* Setup last written lsn cache, max written LSN.
5310+
* Starting from here, we could be modifying pages through REDO, which requires
5311+
* the existance of maxLwLsn + LwLsn LRU.
5312+
*/
5313+
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
5314+
dlist_init(&XLogCtl->lastWrittenLsnLRU);
5315+
53085316
/* REDO */
53095317
if (InRecovery)
53105318
{
@@ -5673,8 +5681,6 @@ StartupXLOG(void)
56735681

56745682
XLogCtl->LogwrtRqst.Write = EndOfLog;
56755683
XLogCtl->LogwrtRqst.Flush = EndOfLog;
5676-
XLogCtl->maxLastWrittenLsn = EndOfLog;
5677-
dlist_init(&XLogCtl->lastWrittenLsnLRU);
56785684

56795685
/*
56805686
* Preallocate additional log files, if wanted.
@@ -8150,11 +8156,14 @@ xlog_redo(XLogReaderState *record)
81508156
continue;
81518157
}
81528158
result = XLogReadBufferForRedo(record, block_id, &buffer);
8153-
if (result == BLK_DONE && !IsUnderPostmaster)
8159+
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
81548160
{
81558161
/*
8156-
* In the special WAL process, blocks that are being ignored
8157-
* return BLK_DONE. Accept that.
8162+
* NEON: In the special WAL redo process, blocks that are being
8163+
* ignored return BLK_DONE. Accept that.
8164+
* Additionally, in standby mode, blocks that are not present
8165+
* in shared buffers are ignored during replay, so we also
8166+
* ignore those blocks.
81588167
*/
81598168
}
81608169
else if (result != BLK_RESTORED)

src/backend/access/transam/xlogrecovery.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ typedef struct XLogRecoveryCtlData
336336
XLogRecPtr lastReplayedReadRecPtr; /* start position */
337337
XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */
338338
TimeLineID lastReplayedTLI; /* timeline */
339+
ConditionVariable replayProgressCV; /* CV for waiters */
339340

340341
/*
341342
* When we're currently replaying a record, ie. in a redo function,
@@ -465,6 +466,7 @@ XLogRecoveryShmemInit(void)
465466

466467
SpinLockInit(&XLogRecoveryCtl->info_lck);
467468
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
469+
ConditionVariableInit(&XLogRecoveryCtl->replayProgressCV);
468470
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
469471
}
470472

@@ -486,6 +488,64 @@ EnableStandbyMode(void)
486488
disable_startup_progress_timeout();
487489
}
488490

491+
/*
492+
* Wait for recovery to complete replaying all WAL up to and including
493+
* redoEndRecPtr.
494+
*
495+
* This gets woken up for every WAL record replayed, so make sure you're not
496+
* trying to wait an LSN that is too far in the future.
497+
*/
498+
void
499+
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
500+
{
501+
static XLogRecPtr replayRecPtr = 0;
502+
503+
if (!RecoveryInProgress())
504+
return;
505+
506+
/*
507+
* Check the backend-local variable first, we may be able to skip accessing
508+
* shared memory (which requires locking)
509+
*/
510+
if (redoEndRecPtr <= replayRecPtr)
511+
return;
512+
513+
replayRecPtr = GetXLogReplayRecPtr(NULL);
514+
515+
/*
516+
* Check again if we're going to need to wait, now that we've updated
517+
* the local cached variable.
518+
*/
519+
if (redoEndRecPtr <= replayRecPtr)
520+
return;
521+
522+
/*
523+
* We need to wait for the variable, so prepare for that.
524+
*
525+
* Note: This wakes up every time a WAL record is replayed, so this can
526+
* be expensive.
527+
*/
528+
ConditionVariablePrepareToSleep(&XLogRecoveryCtl->replayProgressCV);
529+
530+
while (redoEndRecPtr > replayRecPtr)
531+
{
532+
bool timeout;
533+
timeout = ConditionVariableTimedSleep(&XLogRecoveryCtl->replayProgressCV,
534+
10000000, /* 10 seconds */
535+
WAIT_EVENT_RECOVERY_WAL_STREAM);
536+
537+
replayRecPtr = GetXLogReplayRecPtr(NULL);
538+
539+
if (timeout)
540+
ereport(LOG,
541+
(errmsg("Waiting for recovery to catch up to %X/%X (currently %X/%X)",
542+
LSN_FORMAT_ARGS(redoEndRecPtr),
543+
LSN_FORMAT_ARGS(replayRecPtr))));
544+
}
545+
546+
ConditionVariableCancelSleep();
547+
}
548+
489549
/*
490550
* Prepare the system for WAL recovery, if needed.
491551
*
@@ -2077,6 +2137,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
20772137
/* Reset the prefetcher. */
20782138
XLogPrefetchReconfigure();
20792139
}
2140+
2141+
ConditionVariableBroadcast(&XLogRecoveryCtl->replayProgressCV);
20802142
}
20812143

20822144
/*

src/include/access/xlogrecovery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ extern void ShutdownWalRecovery(void);
135135
extern void RemovePromoteSignalFiles(void);
136136

137137
extern bool HotStandbyActive(void);
138+
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
138139
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
139140
extern RecoveryPauseState GetRecoveryPauseState(void);
140141
extern void SetRecoveryPause(bool recoveryPause);

src/include/access/xlogutils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ typedef struct ReadLocalXLogPageNoWaitPrivate
8181
bool end_of_wal; /* true, when end of WAL is reached */
8282
} ReadLocalXLogPageNoWaitPrivate;
8383

84+
/*
85+
* Returns true if we shouldn't do REDO on that block in record indicated by
86+
* block_id; false otherwise.
87+
*/
8488
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
8589

8690
extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,

0 commit comments

Comments
 (0)