Skip to content

Commit 6c8e956

Browse files
MMeenttristan957
authored andcommitted
[PG14] Feature/replicas (#278)
* Recovery requirements: Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer. * Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts. This fixes some test failures that showed up after updating Neon code to do more precise handling of replica's get_page_at_lsn's request_lsn lsns. --------- Co-authored-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
1 parent a9c9a2e commit 6c8e956

File tree

3 files changed

+80
-5
lines changed

3 files changed

+80
-5
lines changed

src/backend/access/transam/xlog.c

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ typedef struct XLogCtlData
754754
TimeLineID lastReplayedTLI;
755755
XLogRecPtr replayEndRecPtr;
756756
TimeLineID replayEndTLI;
757+
ConditionVariable replayProgressCV;
757758
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
758759
TimestampTz recoveryLastXTime;
759760

@@ -5342,9 +5343,67 @@ XLOGShmemInit(void)
53425343
SpinLockInit(&XLogCtl->info_lck);
53435344
SpinLockInit(&XLogCtl->ulsn_lck);
53445345
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
5346+
ConditionVariableInit(&XLogCtl->replayProgressCV);
53455347
ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
53465348
}
53475349

5350+
/*
5351+
* Wait for recovery to complete replaying all WAL up to and including
5352+
* redoEndRecPtr.
5353+
*
5354+
* This gets woken up for every WAL record replayed, so make sure you're not
5355+
* trying to wait an LSN that is too far in the future.
5356+
*/
5357+
void
5358+
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
5359+
{
5360+
static XLogRecPtr replayRecPtr = 0;
5361+
5362+
if (!RecoveryInProgress())
5363+
return;
5364+
5365+
/*
5366+
* Check the backend-local variable first, we may be able to skip accessing
5367+
* shared memory (which requires locking)
5368+
*/
5369+
if (redoEndRecPtr <= replayRecPtr)
5370+
return;
5371+
5372+
replayRecPtr = GetXLogReplayRecPtr(NULL);
5373+
5374+
/*
5375+
* Check again if we're going to need to wait, now that we've updated
5376+
* the local cached variable.
5377+
*/
5378+
if (redoEndRecPtr <= replayRecPtr)
5379+
return;
5380+
5381+
/*
5382+
* We need to wait for the variable, so prepare for that.
5383+
*
5384+
* Note: This wakes up every time a WAL record is replayed, so this can
5385+
* be expensive.
5386+
*/
5387+
ConditionVariablePrepareToSleep(&XLogCtl->replayProgressCV);
5388+
5389+
while (redoEndRecPtr > replayRecPtr)
5390+
{
5391+
bool timeout;
5392+
timeout = ConditionVariableTimedSleep(&XLogCtl->replayProgressCV,
5393+
10000000,
5394+
WAIT_EVENT_RECOVERY_WAL_STREAM);
5395+
5396+
if (timeout)
5397+
ereport(LOG,
5398+
(errmsg("Waiting for recovery to catch up to %X/%X",
5399+
LSN_FORMAT_ARGS(redoEndRecPtr))));
5400+
else
5401+
replayRecPtr = GetXLogReplayRecPtr(NULL);
5402+
}
5403+
5404+
ConditionVariableCancelSleep();
5405+
}
5406+
53485407
/*
53495408
* This func must be called ONCE on system install. It creates pg_control
53505409
* and the initial XLOG segment.
@@ -7267,6 +7326,14 @@ StartupXLOG(void)
72677326
abortedRecPtr = InvalidXLogRecPtr;
72687327
missingContrecPtr = InvalidXLogRecPtr;
72697328

7329+
/*
7330+
* Setup last written lsn cache, max written LSN.
7331+
* Starting from here, we could be modifying pages through REDO, which requires
7332+
* the existance of maxLwLsn + LwLsn LRU.
7333+
*/
7334+
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
7335+
dlist_init(&XLogCtl->lastWrittenLsnLRU);
7336+
72707337
/* REDO */
72717338
if (InRecovery)
72727339
{
@@ -7774,6 +7841,8 @@ StartupXLOG(void)
77747841
WalSndWakeup();
77757842
}
77767843

7844+
ConditionVariableBroadcast(&XLogCtl->replayProgressCV);
7845+
77777846
/* Exit loop if we reached inclusive recovery target */
77787847
if (recoveryStopsAfter(xlogreader))
77797848
{
@@ -8169,8 +8238,6 @@ StartupXLOG(void)
81698238

81708239
XLogCtl->LogwrtRqst.Write = EndOfLog;
81718240
XLogCtl->LogwrtRqst.Flush = EndOfLog;
8172-
XLogCtl->maxLastWrittenLsn = EndOfLog;
8173-
dlist_init(&XLogCtl->lastWrittenLsnLRU);
81748241

81758242
LocalSetXLogInsertAllowed();
81768243

@@ -10980,11 +11047,14 @@ xlog_redo(XLogReaderState *record)
1098011047
XLogRedoAction result;
1098111048

1098211049
result = XLogReadBufferForRedo(record, block_id, &buffer);
10983-
if (result == BLK_DONE && !IsUnderPostmaster)
11050+
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
1098411051
{
1098511052
/*
10986-
* In the special WAL process, blocks that are being ignored
10987-
* return BLK_DONE. Accept that.
11053+
* NEON: In the special WAL redo process, blocks that are being
11054+
* ignored return BLK_DONE. Accept that.
11055+
* Additionally, in standby mode, blocks that are not present
11056+
* in shared buffers are ignored during replay, so we also
11057+
* ignore those blocks.
1098811058
*/
1098911059
}
1099011060
else if (result != BLK_RESTORED)

src/include/access/xlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ extern bool HotStandbyActive(void);
323323
extern bool HotStandbyActiveInReplay(void);
324324
extern bool XLogInsertAllowed(void);
325325
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
326+
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
326327
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
327328
extern XLogRecPtr GetXLogInsertRecPtr(void);
328329
extern XLogRecPtr GetXLogWriteRecPtr(void);

src/include/access/xlogutils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ typedef enum
3333
* need to be replayed) */
3434
} XLogRedoAction;
3535

36+
/*
37+
* Returns true if we shouldn't do REDO on that block in record indicated by
38+
* block_id; false otherwise.
39+
*/
3640
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
3741

3842
extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,

0 commit comments

Comments
 (0)