Skip to content

Commit 915aafe

Browse files
author
Amit Kapila
committed
Don't advance origin during apply failure.
We advance origin progress during abort on successful streaming and application of ROLLBACK in parallel streaming mode. But the origin shouldn't be advanced during an error or unsuccessful apply due to shutdown. Otherwise, it will result in a transaction loss as such a transaction won't be sent again by the server. Reported-by: Hou Zhijie Author: Hayato Kuroda and Shveta Malik Reviewed-by: Amit Kapila Backpatch-through: 16 Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
1 parent 5effd59 commit 915aafe

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

src/backend/replication/logical/worker.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4639,6 +4639,17 @@ InitializeLogRepWorker(void)
46394639
CommitTransactionCommand();
46404640
}
46414641

4642+
/*
4643+
* Reset the origin state.
4644+
*/
4645+
static void
4646+
replorigin_reset(int code, Datum arg)
4647+
{
4648+
replorigin_session_origin = InvalidRepOriginId;
4649+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
4650+
replorigin_session_origin_timestamp = 0;
4651+
}
4652+
46424653
/* Common function to setup the leader apply or tablesync worker. */
46434654
void
46444655
SetupApplyOrSyncWorker(int worker_slot)
@@ -4667,6 +4678,19 @@ SetupApplyOrSyncWorker(int worker_slot)
46674678

46684679
InitializeLogRepWorker();
46694680

4681+
/*
4682+
* Register a callback to reset the origin state before aborting any
4683+
* pending transaction during shutdown (see ShutdownPostgres()). This will
4684+
* avoid origin advancement for an in-complete transaction which could
4685+
* otherwise lead to its loss as such a transaction won't be sent by the
4686+
* server again.
4687+
*
4688+
* Note that even a LOG or DEBUG statement placed after setting the origin
4689+
* state may process a shutdown signal before committing the current apply
4690+
* operation. So, it is important to register such a callback here.
4691+
*/
4692+
before_shmem_exit(replorigin_reset, (Datum) 0);
4693+
46704694
/* Connect to the origin and start the replication. */
46714695
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
46724696
MySubscription->conninfo);
@@ -4893,12 +4917,23 @@ void
48934917
apply_error_callback(void *arg)
48944918
{
48954919
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
4920+
int elevel;
48964921

48974922
if (apply_error_callback_arg.command == 0)
48984923
return;
48994924

49004925
Assert(errarg->origin_name);
49014926

4927+
elevel = geterrlevel();
4928+
4929+
/*
4930+
* Reset the origin state to prevent the advancement of origin progress if
4931+
* we fail to apply. Otherwise, this will result in transaction loss as
4932+
* that transaction won't be sent again by the server.
4933+
*/
4934+
if (elevel >= ERROR)
4935+
replorigin_reset(0, (Datum) 0);
4936+
49024937
if (errarg->rel == NULL)
49034938
{
49044939
if (!TransactionIdIsValid(errarg->remote_xid))

src/backend/utils/error/elog.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,6 +1568,23 @@ geterrcode(void)
15681568
return edata->sqlerrcode;
15691569
}
15701570

1571+
/*
1572+
* geterrlevel --- return the currently set error level
1573+
*
1574+
* This is only intended for use in error callback subroutines, since there
1575+
* is no other place outside elog.c where the concept is meaningful.
1576+
*/
1577+
int
1578+
geterrlevel(void)
1579+
{
1580+
ErrorData *edata = &errordata[errordata_stack_depth];
1581+
1582+
/* we don't bother incrementing recursion_depth */
1583+
CHECK_STACK_DEPTH();
1584+
1585+
return edata->elevel;
1586+
}
1587+
15711588
/*
15721589
* geterrposition --- return the currently set error position (0 if none)
15731590
*

src/include/utils/elog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ extern int internalerrquery(const char *query);
226226
extern int err_generic_string(int field, const char *str);
227227

228228
extern int geterrcode(void);
229+
extern int geterrlevel(void);
229230
extern int geterrposition(void);
230231
extern int getinternalerrposition(void);
231232

src/test/subscription/t/021_twophase.pl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
2424
$node_subscriber->init;
2525
$node_subscriber->append_conf('postgresql.conf',
26-
qq(max_prepared_transactions = 10));
26+
qq(max_prepared_transactions = 0));
2727
$node_subscriber->start;
2828

2929
# Create some pre-existing content on publisher
@@ -67,12 +67,24 @@
6767
# then COMMIT PREPARED
6868
###############################
6969

70+
# Save the log location, to see the failure of the application
71+
my $log_location = -s $node_subscriber->logfile;
72+
7073
$node_publisher->safe_psql(
7174
'postgres', "
7275
BEGIN;
7376
INSERT INTO tab_full VALUES (11);
7477
PREPARE TRANSACTION 'test_prepared_tab_full';");
7578

79+
# Confirm the ERROR is reported becasue max_prepared_transactions is zero
80+
$node_subscriber->wait_for_log(
81+
qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
82+
83+
# Set max_prepared_transactions to correct value to resume the replication
84+
$node_subscriber->append_conf('postgresql.conf',
85+
qq(max_prepared_transactions = 10));
86+
$node_subscriber->restart;
87+
7688
$node_publisher->wait_for_catchup($appname);
7789

7890
# check that transaction is in prepared state on subscriber

0 commit comments

Comments
 (0)