Skip to content

Commit 3703f3c

Browse files
MMeenttristan957
authored andcommitted
[PG15] Feature/replicas (#279)
* Recovery requirements: Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer. * Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts. This fixes some test failures that showed up after updating Neon code to do more precise handling of replica's get_page_at_lsn's request_lsn lsns. --------- Co-authored-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
1 parent f8fc4b5 commit 3703f3c

File tree

4 files changed

+81
-5
lines changed

4 files changed

+81
-5
lines changed

src/backend/access/transam/xlog.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5303,6 +5303,14 @@ StartupXLOG(void)
53035303
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
53045304
doPageWrites = lastFullPageWrites;
53055305

5306+
/*
5307+
* Setup last written lsn cache, max written LSN.
5308+
* Starting from here, we could be modifying pages through REDO, which requires
5309+
* the existance of maxLwLsn + LwLsn LRU.
5310+
*/
5311+
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
5312+
dlist_init(&XLogCtl->lastWrittenLsnLRU);
5313+
53065314
/* REDO */
53075315
if (InRecovery)
53085316
{
@@ -5671,8 +5679,6 @@ StartupXLOG(void)
56715679

56725680
XLogCtl->LogwrtRqst.Write = EndOfLog;
56735681
XLogCtl->LogwrtRqst.Flush = EndOfLog;
5674-
XLogCtl->maxLastWrittenLsn = EndOfLog;
5675-
dlist_init(&XLogCtl->lastWrittenLsnLRU);
56765682

56775683
/*
56785684
* Preallocate additional log files, if wanted.
@@ -8148,11 +8154,14 @@ xlog_redo(XLogReaderState *record)
81488154
continue;
81498155
}
81508156
result = XLogReadBufferForRedo(record, block_id, &buffer);
8151-
if (result == BLK_DONE && !IsUnderPostmaster)
8157+
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
81528158
{
81538159
/*
8154-
* In the special WAL process, blocks that are being ignored
8155-
* return BLK_DONE. Accept that.
8160+
* NEON: In the special WAL redo process, blocks that are being
8161+
* ignored return BLK_DONE. Accept that.
8162+
* Additionally, in standby mode, blocks that are not present
8163+
* in shared buffers are ignored during replay, so we also
8164+
* ignore those blocks.
81568165
*/
81578166
}
81588167
else if (result != BLK_RESTORED)

src/backend/access/transam/xlogrecovery.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ typedef struct XLogRecoveryCtlData
336336
XLogRecPtr lastReplayedReadRecPtr; /* start position */
337337
XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */
338338
TimeLineID lastReplayedTLI; /* timeline */
339+
ConditionVariable replayProgressCV; /* CV for waiters */
339340

340341
/*
341342
* When we're currently replaying a record, ie. in a redo function,
@@ -465,6 +466,7 @@ XLogRecoveryShmemInit(void)
465466

466467
SpinLockInit(&XLogRecoveryCtl->info_lck);
467468
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
469+
ConditionVariableInit(&XLogRecoveryCtl->replayProgressCV);
468470
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
469471
}
470472

@@ -486,6 +488,64 @@ EnableStandbyMode(void)
486488
disable_startup_progress_timeout();
487489
}
488490

491+
/*
492+
* Wait for recovery to complete replaying all WAL up to and including
493+
* redoEndRecPtr.
494+
*
495+
* This gets woken up for every WAL record replayed, so make sure you're not
496+
* trying to wait an LSN that is too far in the future.
497+
*/
498+
void
499+
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
500+
{
501+
static XLogRecPtr replayRecPtr = 0;
502+
503+
if (!RecoveryInProgress())
504+
return;
505+
506+
/*
507+
* Check the backend-local variable first, we may be able to skip accessing
508+
* shared memory (which requires locking)
509+
*/
510+
if (redoEndRecPtr <= replayRecPtr)
511+
return;
512+
513+
replayRecPtr = GetXLogReplayRecPtr(NULL);
514+
515+
/*
516+
* Check again if we're going to need to wait, now that we've updated
517+
* the local cached variable.
518+
*/
519+
if (redoEndRecPtr <= replayRecPtr)
520+
return;
521+
522+
/*
523+
* We need to wait for the variable, so prepare for that.
524+
*
525+
* Note: This wakes up every time a WAL record is replayed, so this can
526+
* be expensive.
527+
*/
528+
ConditionVariablePrepareToSleep(&XLogRecoveryCtl->replayProgressCV);
529+
530+
while (redoEndRecPtr > replayRecPtr)
531+
{
532+
bool timeout;
533+
timeout = ConditionVariableTimedSleep(&XLogRecoveryCtl->replayProgressCV,
534+
10000000, /* 10 seconds */
535+
WAIT_EVENT_RECOVERY_WAL_STREAM);
536+
537+
replayRecPtr = GetXLogReplayRecPtr(NULL);
538+
539+
if (timeout)
540+
ereport(LOG,
541+
(errmsg("Waiting for recovery to catch up to %X/%X (currently %X/%X)",
542+
LSN_FORMAT_ARGS(redoEndRecPtr),
543+
LSN_FORMAT_ARGS(replayRecPtr))));
544+
}
545+
546+
ConditionVariableCancelSleep();
547+
}
548+
489549
/*
490550
* Prepare the system for WAL recovery, if needed.
491551
*
@@ -2051,6 +2111,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
20512111
/* Reset the prefetcher. */
20522112
XLogPrefetchReconfigure();
20532113
}
2114+
2115+
ConditionVariableBroadcast(&XLogRecoveryCtl->replayProgressCV);
20542116
}
20552117

20562118
/*

src/include/access/xlogrecovery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ extern void ShutdownWalRecovery(void);
135135
extern void RemovePromoteSignalFiles(void);
136136

137137
extern bool HotStandbyActive(void);
138+
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
138139
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
139140
extern RecoveryPauseState GetRecoveryPauseState(void);
140141
extern void SetRecoveryPause(bool recoveryPause);

src/include/access/xlogutils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ typedef struct ReadLocalXLogPageNoWaitPrivate
8181
bool end_of_wal; /* true, when end of WAL is reached */
8282
} ReadLocalXLogPageNoWaitPrivate;
8383

84+
/*
85+
* Returns true if we shouldn't do REDO on that block in record indicated by
86+
* block_id; false otherwise.
87+
*/
8488
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
8589

8690
extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,

0 commit comments

Comments
 (0)