Skip to content

Commit aee72b7

Browse files
authored
[PG15] Feature/replicas (#279)
* Recovery requirements: Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer. * Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts. This fixes some test failures that showed up after updating Neon code to do more precise handling of replica's get_page_at_lsn's request_lsn lsns. --------- Co-authored-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
1 parent 4ad87b0 commit aee72b7

File tree

4 files changed

+81
-5
lines changed

4 files changed

+81
-5
lines changed

src/backend/access/transam/xlog.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5303,6 +5303,14 @@ StartupXLOG(void)
53035303
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
53045304
doPageWrites = lastFullPageWrites;
53055305

5306+
/*
5307+
* Setup last written lsn cache, max written LSN.
5308+
* Starting from here, we could be modifying pages through REDO, which requires
5309+
* the existance of maxLwLsn + LwLsn LRU.
5310+
*/
5311+
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
5312+
dlist_init(&XLogCtl->lastWrittenLsnLRU);
5313+
53065314
/* REDO */
53075315
if (InRecovery)
53085316
{
@@ -5671,8 +5679,6 @@ StartupXLOG(void)
56715679

56725680
XLogCtl->LogwrtRqst.Write = EndOfLog;
56735681
XLogCtl->LogwrtRqst.Flush = EndOfLog;
5674-
XLogCtl->maxLastWrittenLsn = EndOfLog;
5675-
dlist_init(&XLogCtl->lastWrittenLsnLRU);
56765682

56775683
/*
56785684
* Preallocate additional log files, if wanted.
@@ -8144,11 +8150,14 @@ xlog_redo(XLogReaderState *record)
81448150
continue;
81458151
}
81468152
result = XLogReadBufferForRedo(record, block_id, &buffer);
8147-
if (result == BLK_DONE && !IsUnderPostmaster)
8153+
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
81488154
{
81498155
/*
8150-
* In the special WAL process, blocks that are being ignored
8151-
* return BLK_DONE. Accept that.
8156+
* NEON: In the special WAL redo process, blocks that are being
8157+
* ignored return BLK_DONE. Accept that.
8158+
* Additionally, in standby mode, blocks that are not present
8159+
* in shared buffers are ignored during replay, so we also
8160+
* ignore those blocks.
81528161
*/
81538162
}
81548163
else if (result != BLK_RESTORED)

src/backend/access/transam/xlogrecovery.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ typedef struct XLogRecoveryCtlData
336336
XLogRecPtr lastReplayedReadRecPtr; /* start position */
337337
XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */
338338
TimeLineID lastReplayedTLI; /* timeline */
339+
ConditionVariable replayProgressCV; /* CV for waiters */
339340

340341
/*
341342
* When we're currently replaying a record, ie. in a redo function,
@@ -464,9 +465,68 @@ XLogRecoveryShmemInit(void)
464465

465466
SpinLockInit(&XLogRecoveryCtl->info_lck);
466467
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
468+
ConditionVariableInit(&XLogRecoveryCtl->replayProgressCV);
467469
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
468470
}
469471

472+
/*
473+
* Wait for recovery to complete replaying all WAL up to and including
474+
* redoEndRecPtr.
475+
*
476+
* This gets woken up for every WAL record replayed, so make sure you're not
477+
* trying to wait an LSN that is too far in the future.
478+
*/
479+
void
480+
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
481+
{
482+
static XLogRecPtr replayRecPtr = 0;
483+
484+
if (!RecoveryInProgress())
485+
return;
486+
487+
/*
488+
* Check the backend-local variable first, we may be able to skip accessing
489+
* shared memory (which requires locking)
490+
*/
491+
if (redoEndRecPtr <= replayRecPtr)
492+
return;
493+
494+
replayRecPtr = GetXLogReplayRecPtr(NULL);
495+
496+
/*
497+
* Check again if we're going to need to wait, now that we've updated
498+
* the local cached variable.
499+
*/
500+
if (redoEndRecPtr <= replayRecPtr)
501+
return;
502+
503+
/*
504+
* We need to wait for the variable, so prepare for that.
505+
*
506+
* Note: This wakes up every time a WAL record is replayed, so this can
507+
* be expensive.
508+
*/
509+
ConditionVariablePrepareToSleep(&XLogRecoveryCtl->replayProgressCV);
510+
511+
while (redoEndRecPtr > replayRecPtr)
512+
{
513+
bool timeout;
514+
timeout = ConditionVariableTimedSleep(&XLogRecoveryCtl->replayProgressCV,
515+
10000000, /* 10 seconds */
516+
WAIT_EVENT_RECOVERY_WAL_STREAM);
517+
518+
replayRecPtr = GetXLogReplayRecPtr(NULL);
519+
520+
if (timeout)
521+
ereport(LOG,
522+
(errmsg("Waiting for recovery to catch up to %X/%X (currently %X/%X)",
523+
LSN_FORMAT_ARGS(redoEndRecPtr),
524+
LSN_FORMAT_ARGS(replayRecPtr))));
525+
}
526+
527+
ConditionVariableCancelSleep();
528+
}
529+
470530
/*
471531
* Prepare the system for WAL recovery, if needed.
472532
*
@@ -2032,6 +2092,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
20322092
/* Reset the prefetcher. */
20332093
XLogPrefetchReconfigure();
20342094
}
2095+
2096+
ConditionVariableBroadcast(&XLogRecoveryCtl->replayProgressCV);
20352097
}
20362098

20372099
/*

src/include/access/xlogrecovery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ extern void ShutdownWalRecovery(void);
135135
extern void RemovePromoteSignalFiles(void);
136136

137137
extern bool HotStandbyActive(void);
138+
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
138139
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
139140
extern RecoveryPauseState GetRecoveryPauseState(void);
140141
extern void SetRecoveryPause(bool recoveryPause);

src/include/access/xlogutils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ typedef struct ReadLocalXLogPageNoWaitPrivate
8181
bool end_of_wal; /* true, when end of WAL is reached */
8282
} ReadLocalXLogPageNoWaitPrivate;
8383

84+
/*
85+
* Returns true if we shouldn't do REDO on that block in record indicated by
86+
* block_id; false otherwise.
87+
*/
8488
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
8589

8690
extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,

0 commit comments

Comments
 (0)