Skip to content

Commit a54a771

Browse files
committed
Add GetReplayXlogPtrHook allowing extensions to adjust replay LSN
Useful when extension performs some async operations in the WAL record redo function.
1 parent 57589a8 commit a54a771

File tree

5 files changed

+31
-3
lines changed

5 files changed

+31
-3
lines changed

src/backend/access/transam/xlogfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ pg_last_wal_replay_lsn(PG_FUNCTION_ARGS)
363363
{
364364
XLogRecPtr recptr;
365365

366-
recptr = GetXLogReplayRecPtr(NULL);
366+
recptr = GetEffectiveXlogReplayRecPtr();
367367

368368
if (recptr == 0)
369369
PG_RETURN_NULL();

src/backend/access/transam/xlogrecovery.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4504,6 +4504,28 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
45044504
return recptr;
45054505
}
45064506

4507+
GetReplayXlogPtrHookType GetReplayXlogPtrHook = NULL;
4508+
4509+
/*
4510+
* Get effective latest redo apply position.
4511+
*
4512+
* Can be tuned by extensions processing WAL records asyncronously.
4513+
*/
4514+
XLogRecPtr
4515+
GetEffectiveXlogReplayRecPtr(void)
4516+
{
4517+
XLogRecPtr recptr = InvalidXLogRecPtr;
4518+
4519+
SpinLockAcquire(&XLogRecoveryCtl->info_lck);
4520+
if (GetReplayXlogPtrHook)
4521+
recptr = GetReplayXlogPtrHook();
4522+
if (recptr == InvalidXLogRecPtr)
4523+
recptr = XLogRecoveryCtl->lastReplayedEndRecPtr;
4524+
SpinLockRelease(&XLogRecoveryCtl->info_lck);
4525+
4526+
return recptr;
4527+
}
4528+
45074529

45084530
/*
45094531
* Get position of last applied, or the record being applied.

src/backend/replication/walreceiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
11301130
/* Construct a new message */
11311131
writePtr = LogstreamResult.Write;
11321132
flushPtr = LogstreamResult.Flush;
1133-
applyPtr = GetXLogReplayRecPtr(NULL);
1133+
applyPtr = GetEffectiveXlogReplayRecPtr();
11341134

11351135
resetStringInfo(&reply_message);
11361136
pq_sendbyte(&reply_message, 'r');

src/backend/replication/walreceiverfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ GetReplicationApplyDelay(void)
373373
receivePtr = walrcv->flushedUpto;
374374
SpinLockRelease(&walrcv->mutex);
375375

376-
replayPtr = GetXLogReplayRecPtr(NULL);
376+
replayPtr = GetEffectiveXlogReplayRecPtr();
377377

378378
if (receivePtr == replayPtr)
379379
return 0;

src/include/access/xlogrecovery.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ typedef enum RecoveryPauseState
4848
RECOVERY_PAUSED /* recovery is paused */
4949
} RecoveryPauseState;
5050

51+
typedef XLogRecPtr (*GetReplayXlogPtrHookType) (void);
52+
5153
/* User-settable GUC parameters */
5254
extern PGDLLIMPORT bool recoveryTargetInclusive;
5355
extern PGDLLIMPORT int recoveryTargetAction;
@@ -76,6 +78,9 @@ extern PGDLLIMPORT bool reachedConsistency;
7678
/* Are we currently in standby mode? */
7779
extern PGDLLIMPORT bool StandbyMode;
7880

81+
/* Hook for extensions to tune replay xlog pointer */
82+
extern PGDLLIMPORT GetReplayXlogPtrHookType GetReplayXlogPtrHook;
83+
7984
extern Size XLogRecoveryShmemSize(void);
8085
extern void XLogRecoveryShmemInit(void);
8186

@@ -137,6 +142,7 @@ extern void RemovePromoteSignalFiles(void);
137142

138143
extern bool HotStandbyActive(void);
139144
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
145+
extern XLogRecPtr GetEffectiveXlogReplayRecPtr(void);
140146
extern RecoveryPauseState GetRecoveryPauseState(void);
141147
extern void SetRecoveryPause(bool recoveryPause);
142148
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);

0 commit comments

Comments
 (0)