Skip to content

Commit

Permalink
Keep synchronizing slots when others are lagging on primary.
Browse files Browse the repository at this point in the history
Instead of blocking indefinitely for a replication slot to be syncable,
introduce a new GUC pg_failover_slots.sync_timeout after which we will
move to the next one.

To avoid waiting from scratch, we create the replication as temporary
ones instead of ephemeral ones, allowing them to keep their state
between runs. When the slot is finally synced, we persist it to disk.

Since we do not block in waiting state anymore, we need to cleanup the
inconsistent slots after promotion.
  • Loading branch information
rdunklau committed Sep 20, 2024
1 parent e26870b commit 60644bb
Showing 1 changed file with 146 additions and 76 deletions.
222 changes: 146 additions & 76 deletions pg_failover_slots.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ char *pg_failover_maintenance_db;
/* Slots to sync */
char *pg_failover_slots_dsn;
char *pg_failover_slot_names;
int pg_failover_slots_sync_timeout;
static char *pg_failover_slot_names_str = NULL;
static List *pg_failover_slot_names_list = NIL;
static bool pg_failover_slots_drop = true;
Expand All @@ -119,6 +120,12 @@ char *pg_failover_slots_version_str;
void _PG_init(void);
PGDLLEXPORT void pg_failover_slots_main(Datum main_arg);

typedef enum SlotCatchupState {
CatchupSlotDrop = 0,
CatchupSlotSucceeded = 1,
CatchupSlotDirty = 2,
} SlotCatchupState;

static bool
check_failover_slot_names(char **newval, void **extra, GucSource source)
{
Expand Down Expand Up @@ -534,14 +541,16 @@ remote_connect(const char *connstr, const char *appname)
* relies on us having already reserved the WAL for the old position of
* `remote_slot` so `slot` can't continue to advance.
*/
static bool
static SlotCatchupState
wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
{
List *slots;
PGconn *conn;
StringInfoData connstr;
TimestampTz cb_wait_start =
0; /* first invocation should happen immediately */
TimestampTz wait_start = GetCurrentTimestamp();
TimestampTz now = 0;

elog(
LOG,
Expand Down Expand Up @@ -584,7 +593,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
"replication slot sync wait for slot %s interrupted by promotion",
remote_slot->name)));
PQfinish(conn);
return false;
return CatchupSlotDrop;
}

filter->key = FAILOVERSLOT_FILTER_NAME;
Expand All @@ -595,7 +604,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
{
/* Slot on provider vanished */
PQfinish(conn);
return false;
return CatchupSlotDrop;
}

receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
Expand All @@ -616,14 +625,30 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
remote_slot->confirmed_lsn = new_slot->confirmed_lsn;
remote_slot->catalog_xmin = new_slot->catalog_xmin;
PQfinish(conn);
return true;
return CatchupSlotSucceeded;
}

/*
* Invoke any callbacks that will help move the slots along
*/
now = GetCurrentTimestamp();
if (pg_failover_slots_sync_timeout >= 0 && TimestampDifferenceExceeds(
wait_start, now, pg_failover_slots_sync_timeout))
{
elog(
LOG,
"Give up on waiting for remote slot %s lsn (%X/%X) and catalog xmin (%u) to pass local slot lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (new_slot->restart_lsn >> 32),
(uint32) (new_slot->restart_lsn), new_slot->catalog_xmin,
(uint32) (slot->data.restart_lsn >> 32),
(uint32) (slot->data.restart_lsn),
slot->data.catalog_xmin);
PQfinish(conn);
return CatchupSlotDirty;
}

if (TimestampDifferenceExceeds(
cb_wait_start, GetCurrentTimestamp(),
cb_wait_start, now,
Min(wal_retrieve_retry_interval * 5, PG_WAIT_EXTENSION)))
{
if (cb_wait_start > 0)
Expand All @@ -639,6 +664,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
cb_wait_start = GetCurrentTimestamp();
}


rc =
WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
wal_retrieve_retry_interval, PG_WAIT_EXTENSION);
Expand All @@ -648,6 +674,16 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)


ResetLatch(MyLatch);

/*
* The user may change pg_failover_slots_sync_timeout, so update it if needed.
*/
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}

}
}

Expand All @@ -669,6 +705,7 @@ synchronize_one_slot(RemoteSlot *remote_slot)
{
int i;
bool found = false;
SlotCatchupState slot_state = CatchupSlotDirty;

if (!RecoveryInProgress())
{
Expand Down Expand Up @@ -714,42 +751,10 @@ synchronize_one_slot(RemoteSlot *remote_slot)
if (found)
{
ReplicationSlotAcquire(remote_slot->name, true);

/*
* We can't satisfy this remote slot's requirements with our known-safe
* local restart_lsn, catalog_xmin and xmin.
*
* This shouldn't happen for existing slots unless someone else messed
* with our physical replication slot on the master.
*/
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
elog(
WARNING,
"not synchronizing slot %s; synchronization would move it backward",
remote_slot->name);

ReplicationSlotRelease();
PopActiveSnapshot();
CommitTransactionCommand();
return;
}

LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();
ReplicationSlotSave();

elog(
DEBUG2,
"synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
if (MyReplicationSlot->data.persistency == RS_PERSISTENT)
slot_state = CatchupSlotSucceeded;
else
slot_state = CatchupSlotDirty;
}
/*
* Otherwise create the local slot and initialize it to the state of the
Expand All @@ -767,13 +772,13 @@ synchronize_one_slot(RemoteSlot *remote_slot)
* don't want it to persist if we fail.
*/
#if PG_VERSION_NUM >= 170000
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase, false, false);
#elif PG_VERSION_NUM >= 140000
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase);
#else
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL);
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY);
#endif
slot = MyReplicationSlot;

Expand All @@ -796,56 +801,83 @@ synchronize_one_slot(RemoteSlot *remote_slot)
slot->data.catalog_xmin = xmin_horizon;
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
slot_state = CatchupSlotDirty;
}

if (slot_state == CatchupSlotSucceeded)
{
/*
* Our xmin and/or catalog_xmin may be > that required by one or more
* of the slots we are trying to sync from the master, and/or we don't
* have enough retained WAL for the slot's restart_lsn.
*
* If we persist the slot locally in that state it'll make a false
* promise we can't satisfy.
*
* This can happen if this replica is fairly new or has only recently
* started failover slot sync.
* We can't satisfy this remote slot's requirements with our known-safe
* local restart_lsn, catalog_xmin and xmin.
*
* TODO: Don't stop synchronization of other slots for this, we can't
* add timeout because that could result in some slots never being
* synchronized as they will always be behind the physical slot.
* This shouldn't happen for existing slots unless someone else messed
* with our physical replication slot on the master.
*/
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot))
elog(
WARNING,
"not synchronizing slot %s; synchronization would move it backward",
remote_slot->name);
ReplicationSlotRelease();
PopActiveSnapshot();
CommitTransactionCommand();
return;
}

LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(
DEBUG2,
"synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
} else {
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
slot_state = wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot);
if (slot_state == CatchupSlotDrop)
{
/* Provider slot didn't catch up to locally reserved position
*/
ReplicationSlotRelease();
ReplicationSlotDrop(remote_slot->name, false);
PopActiveSnapshot();
CommitTransactionCommand();
return;
}
} else {
slot_state = CatchupSlotSucceeded;
}

/*
* We can locally satisfy requirements of remote slot's current
* position now. Apply the new position if any and make it persistent.
*/
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();

ReplicationSlotPersist();

if (slot_state == CatchupSlotSucceeded)
{
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();
ReplicationSlotPersist();
}
elog(DEBUG1,
"synchronized new slot %s to lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
}

ReplicationSlotRelease();
PopActiveSnapshot();
CommitTransactionCommand();
Expand Down Expand Up @@ -944,7 +976,7 @@ synchronize_failover_slots(long sleep_time)
bool active;
bool found = false;

active = (s->active_pid != 0);
active = (s->active_pid != 0 && s->active_pid != MyProcPid);

/* Only check inactive slots. */
if (!s->in_use || active)
Expand Down Expand Up @@ -1053,13 +1085,6 @@ synchronize_failover_slots(long sleep_time)
if (remote_slot->confirmed_lsn > receivePtr)
remote_slot->confirmed_lsn = receivePtr;

/*
* For simplicity we always move restart_lsn of all slots to the
* restart_lsn needed by the furthest-behind master slot.
*/
if (remote_slot->restart_lsn > lsn)
remote_slot->restart_lsn = lsn;

synchronize_one_slot(remote_slot);
}

Expand All @@ -1073,6 +1098,41 @@ synchronize_failover_slots(long sleep_time)
return sleep_time;
}


/*
* After a promotion, we need to clean up the unpersisted replication slots we created while in recovery.
* If they have never been persisted, it means they are in an incosistent state.
*/
static void
cleanup_failover_slots_after_promotion()
{
int i;
for (;;)
{
char * dropslot = NULL;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

if (s->active_pid == MyProcPid && s->data.persistency == RS_TEMPORARY)
{
dropslot = pstrdup(NameStr(s->data.name));
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
if (dropslot)
{
elog(WARNING, "dropping inconsistent replication slot after promotion \"%s\"", dropslot);
ReplicationSlotDrop(dropslot, false);
pfree(dropslot);
}
else
break;
}
}

void
pg_failover_slots_main(Datum main_arg)
{
Expand Down Expand Up @@ -1102,7 +1162,10 @@ pg_failover_slots_main(Datum main_arg)
if (RecoveryInProgress())
sleep_time = synchronize_failover_slots(worker_nap_time);
else
{
cleanup_failover_slots_after_promotion();
sleep_time = worker_nap_time * 10;
}

rc =
WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
Expand Down Expand Up @@ -1493,6 +1556,13 @@ _PG_init(void)
&pg_failover_maintenance_db, "postgres", PGC_SIGHUP, GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);

DefineCustomIntVariable(
"pg_failover_slots.sync_timeout",
"timeout when waiting for a slot to be persisted",
"If set to -1 (the default), we wait forever meaning we could hang up"
"up on one slot while other slots are ok to be synced.",
&pg_failover_slots_sync_timeout, -1, -1, INT_MAX, PGC_SIGHUP,
GUC_UNIT_MS, NULL, NULL, NULL);

if (IsBinaryUpgrade)
return;
Expand All @@ -1505,7 +1575,7 @@ _PG_init(void)
snprintf(bgw.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "pg_failover_slots_main");
snprintf(bgw.bgw_name, BGW_MAXLEN, "pg_failover_slots worker");
bgw.bgw_restart_time = 60;
bgw.bgw_restart_time = 10;

RegisterBackgroundWorker(&bgw);

Expand Down

0 comments on commit 60644bb

Please sign in to comment.