Skip to content

Commit dbed2e3

Browse files
Fix memory counter update in ReorderBuffer.
Commit 5bec1d6 changed the memory usage updates of the ReorderBufferTXN to zero all at once by subtracting txn->size, rather than updating it for each change. However, if TOAST reconstruction data remained in the transaction when freeing it, there were cases where it further subtracted the memory counter from zero, resulting in an assertion failure. This change calculates the memory size for each change and updates the memory usage to precisely the amount that has been freed. Backpatch to v17, where this was introducd. Reviewed-by: Amit Kapila, Shlok Kyal Discussion: https://postgr.es/m/CAD21AoAqkNUvicgKPT_dXzNoOwpPkVTg0QPPxEcWmzT0moCJ1g%40mail.gmail.com Backpatch-through: 17
1 parent 6749d4a commit dbed2e3

File tree

3 files changed

+62
-4
lines changed

3 files changed

+62
-4
lines changed

contrib/test_decoding/expected/stream.out

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
109109
committing streamed transaction
110110
(17 rows)
111111

112+
/*
113+
* Test concurrent abort with toast data. When streaming the second insertion, we
114+
* detect that the subtransaction was aborted, and reset the transaction while having
115+
* the TOAST changes in memory, resulting in deallocating both decoded changes and
116+
* TOAST reconstruction data. Memory usage counters must be updated correctly.
117+
*/
118+
BEGIN;
119+
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
120+
ALTER TABLE stream_test ADD COLUMN i INT;
121+
SAVEPOINT s1;
122+
INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
123+
ROLLBACK TO s1;
124+
COMMIT;
125+
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
126+
count
127+
-------
128+
5
129+
(1 row)
130+
112131
DROP TABLE stream_test;
113132
SELECT pg_drop_replication_slot('regression_slot');
114133
pg_drop_replication_slot

contrib/test_decoding/sql/stream.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
4444

4545
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
4646

47+
/*
48+
* Test concurrent abort with toast data. When streaming the second insertion, we
49+
* detect that the subtransaction was aborted, and reset the transaction while having
50+
* the TOAST changes in memory, resulting in deallocating both decoded changes and
51+
* TOAST reconstruction data. Memory usage counters must be updated correctly.
52+
*/
53+
BEGIN;
54+
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
55+
ALTER TABLE stream_test ADD COLUMN i INT;
56+
SAVEPOINT s1;
57+
INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
58+
ROLLBACK TO s1;
59+
COMMIT;
60+
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
61+
4762
DROP TABLE stream_test;
4863
SELECT pg_drop_replication_slot('regression_slot');

src/backend/replication/logical/reorderbuffer.c

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
467467
/* Reset the toast hash */
468468
ReorderBufferToastReset(rb, txn);
469469

470+
/* All changes must be deallocated */
471+
Assert(txn->size == 0);
472+
470473
pfree(txn);
471474
}
472475

@@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
15061509
{
15071510
bool found;
15081511
dlist_mutable_iter iter;
1512+
Size mem_freed = 0;
15091513

15101514
/* cleanup subtransactions & their changes */
15111515
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
15351539
/* Check we're not mixing changes from different transactions. */
15361540
Assert(change->txn == txn);
15371541

1542+
/*
1543+
* Instead of updating the memory counter for individual changes,
1544+
* we sum up the size of memory to free so we can update the memory
1545+
* counter all together below. This saves costs of maintaining
1546+
* the max-heap.
1547+
*/
1548+
mem_freed += ReorderBufferChangeSize(change);
1549+
15381550
ReorderBufferReturnChange(rb, change, false);
15391551
}
15401552

1553+
/* Update the memory counter */
1554+
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1555+
15411556
/*
15421557
* Cleanup the tuplecids we stored for decoding catalog snapshot access.
15431558
* They are always stored in the toplevel transaction.
@@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
15941609
if (rbtxn_is_serialized(txn))
15951610
ReorderBufferRestoreCleanup(rb, txn);
15961611

1597-
/* Update the memory counter */
1598-
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
1599-
16001612
/* deallocate */
16011613
ReorderBufferReturnTXN(rb, txn);
16021614
}
@@ -1616,6 +1628,7 @@ static void
16161628
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
16171629
{
16181630
dlist_mutable_iter iter;
1631+
Size mem_freed = 0;
16191632

16201633
/* cleanup subtransactions & their changes */
16211634
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
16481661
/* remove the change from it's containing list */
16491662
dlist_delete(&change->node);
16501663

1664+
/*
1665+
* Instead of updating the memory counter for individual changes,
1666+
* we sum up the size of memory to free so we can update the memory
1667+
* counter all together below. This saves costs of maintaining
1668+
* the max-heap.
1669+
*/
1670+
mem_freed += ReorderBufferChangeSize(change);
1671+
16511672
ReorderBufferReturnChange(rb, change, false);
16521673
}
16531674

16541675
/* Update the memory counter */
1655-
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
1676+
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
16561677

16571678
/*
16581679
* Mark the transaction as streamed.
@@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
20622083
rb->stream_stop(rb, txn, last_lsn);
20632084
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
20642085
}
2086+
2087+
/* All changes must be deallocated */
2088+
Assert(txn->size == 0);
20652089
}
20662090

20672091
/*

0 commit comments

Comments
 (0)