Skip to content

Commit cadbbcc

Browse files
knizhnikKonstantin Knizhnikarssher
authored andcommitted
Neon logical replication support for PG14 (#309)
* Neon logical replication support for PG14 * Log heap rewrite file after creation. --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech> Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
1 parent 4bbdda2 commit cadbbcc

File tree

5 files changed

+117
-6
lines changed

5 files changed

+117
-6
lines changed

src/backend/access/heap/rewriteheap.c

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
#include "miscadmin.h"
118118
#include "pgstat.h"
119119
#include "replication/logical.h"
120+
#include "replication/message.h"
120121
#include "replication/slot.h"
121122
#include "storage/bufmgr.h"
122123
#include "storage/fd.h"
@@ -785,6 +786,36 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
785786
* ------------------------------------------------------------------------
786787
*/
787788

789+
/*
790+
* NEON: we need to persist mapping file in WAL
791+
*/
792+
static void
793+
wallog_mapping_file(char const* path, int fd)
794+
{
795+
char prefix[MAXPGPATH];
796+
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
797+
if (fd < 0)
798+
{
799+
elog(DEBUG1, "neon: deleting contents of rewrite file %s", path);
800+
/* unlink file */
801+
LogLogicalMessage(prefix, NULL, 0, false);
802+
}
803+
else
804+
{
805+
off_t size = lseek(fd, 0, SEEK_END);
806+
char* buf;
807+
elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, size);
808+
if (size < 0)
809+
elog(ERROR, "Failed to get size of mapping file: %m");
810+
buf = palloc((size_t)size);
811+
lseek(fd, 0, SEEK_SET);
812+
if (read(fd, buf, (size_t)size) != size)
813+
elog(ERROR, "Failed to read mapping file: %m");
814+
LogLogicalMessage(prefix, buf, (size_t)size, false);
815+
pfree(buf);
816+
}
817+
}
818+
788819
/*
789820
* Do preparations for logging logical mappings during a rewrite if
790821
* necessary. If we detect that we don't need to log anything we'll prevent
@@ -920,6 +951,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state)
920951
errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
921952
written, len)));
922953
src->off += len;
954+
wallog_mapping_file(src->path, FileGetRawDesc(src->vfd));
923955

924956
XLogBeginInsert();
925957
XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
@@ -1006,7 +1038,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
10061038
src->off = 0;
10071039
memcpy(src->path, path, sizeof(path));
10081040
src->vfd = PathNameOpenFile(path,
1009-
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1041+
O_CREAT | O_EXCL | O_RDWR | PG_BINARY);
10101042
if (src->vfd < 0)
10111043
ereport(ERROR,
10121044
(errcode_for_file_access(),
@@ -1172,6 +1204,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
11721204
errmsg("could not fsync file \"%s\": %m", path)));
11731205
pgstat_report_wait_end();
11741206

1207+
wallog_mapping_file(path, fd);
1208+
11751209
if (CloseTransientFile(fd) != 0)
11761210
ereport(ERROR,
11771211
(errcode_for_file_access(),
@@ -1247,6 +1281,7 @@ CheckPointLogicalRewriteHeap(void)
12471281
ereport(ERROR,
12481282
(errcode_for_file_access(),
12491283
errmsg("could not remove file \"%s\": %m", path)));
1284+
wallog_mapping_file(path, -1);
12501285
}
12511286
else
12521287
{
@@ -1275,6 +1310,8 @@ CheckPointLogicalRewriteHeap(void)
12751310
errmsg("could not fsync file \"%s\": %m", path)));
12761311
pgstat_report_wait_end();
12771312

1313+
wallog_mapping_file(path, fd);
1314+
12781315
if (CloseTransientFile(fd) != 0)
12791316
ereport(ERROR,
12801317
(errcode_for_file_access(),

src/backend/access/transam/xlog.c

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,7 @@ static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
968968
static void LocalSetXLogInsertAllowed(void);
969969
static void CreateEndOfRecoveryRecord(void);
970970
static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
971+
static void PreCheckPointGuts(int flags);
971972
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
972973
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
973974
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -9573,6 +9574,11 @@ CreateCheckPoint(int flags)
95739574
*/
95749575
SyncPreCheckpoint();
95759576

9577+
/*
9578+
* NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer.
9579+
*/
9580+
PreCheckPointGuts(flags);
9581+
95769582
/*
95779583
* Use a critical section to force system panic if we have trouble.
95789584
*/
@@ -10042,6 +10048,28 @@ CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
1004210048
return recptr;
1004310049
}
1004410050

10051+
static void
10052+
CheckPointReplicationState(void)
10053+
{
10054+
CheckPointRelationMap();
10055+
CheckPointReplicationSlots();
10056+
CheckPointSnapBuild();
10057+
CheckPointLogicalRewriteHeap();
10058+
CheckPointReplicationOrigin();
10059+
}
10060+
10061+
/*
10062+
* NEON: we use logical records to persist information of about slots, origins, relation map...
10063+
* If it is done inside shutdown checkpoint, then Postgres panics: "concurrent write-ahead log activity while database system is shutting down"
10064+
* So it before checkpoint REDO position is determined.
10065+
*/
10066+
static void
10067+
PreCheckPointGuts(int flags)
10068+
{
10069+
if (flags & CHECKPOINT_IS_SHUTDOWN)
10070+
CheckPointReplicationState();
10071+
}
10072+
1004510073
/*
1004610074
* Flush all data in shared memory to disk, and fsync
1004710075
*
@@ -10051,11 +10079,8 @@ CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
1005110079
static void
1005210080
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
1005310081
{
10054-
CheckPointRelationMap();
10055-
CheckPointReplicationSlots();
10056-
CheckPointSnapBuild();
10057-
CheckPointLogicalRewriteHeap();
10058-
CheckPointReplicationOrigin();
10082+
if (!(flags & CHECKPOINT_IS_SHUTDOWN))
10083+
CheckPointReplicationState();
1005910084

1006010085
/* Write out all dirty data in SLRUs and the main buffer pool */
1006110086
TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);

src/backend/replication/logical/origin.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
#include "nodes/execnodes.h"
8282
#include "pgstat.h"
8383
#include "replication/logical.h"
84+
#include "replication/message.h"
8485
#include "replication/origin.h"
8586
#include "storage/condition_variable.h"
8687
#include "storage/copydir.h"
@@ -562,10 +563,14 @@ CheckPointReplicationOrigin(void)
562563
int i;
563564
uint32 magic = REPLICATION_STATE_MAGIC;
564565
pg_crc32c crc;
566+
char *buf;
567+
size_t chkp_size;
565568

566569
if (max_replication_slots == 0)
567570
return;
568571

572+
buf = palloc(sizeof(magic) + max_replication_slots*sizeof(ReplicationStateOnDisk) + sizeof(crc));
573+
569574
INIT_CRC32C(crc);
570575

571576
/* make sure no old temp file is remaining */
@@ -599,6 +604,9 @@ CheckPointReplicationOrigin(void)
599604
errmsg("could not write to file \"%s\": %m",
600605
tmppath)));
601606
}
607+
memcpy(buf, &magic, sizeof magic);
608+
chkp_size = sizeof(magic);
609+
602610
COMP_CRC32C(crc, &magic, sizeof(magic));
603611

604612
/* prevent concurrent creations/drops */
@@ -641,6 +649,8 @@ CheckPointReplicationOrigin(void)
641649
errmsg("could not write to file \"%s\": %m",
642650
tmppath)));
643651
}
652+
memcpy(buf + chkp_size, &disk_state, sizeof(disk_state));
653+
chkp_size += sizeof(disk_state);
644654

645655
COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
646656
}
@@ -660,6 +670,15 @@ CheckPointReplicationOrigin(void)
660670
errmsg("could not write to file \"%s\": %m",
661671
tmppath)));
662672
}
673+
if (chkp_size != sizeof(magic)) /* has some valid origins */
674+
{
675+
memcpy(buf + chkp_size, &crc, sizeof crc);
676+
chkp_size += sizeof(crc);
677+
678+
/* NEON specific: persist snapshot in storage using logical message */
679+
LogLogicalMessage("neon-file:pg_logical/replorigin_checkpoint", buf, chkp_size, false);
680+
}
681+
pfree(buf);
663682

664683
if (CloseTransientFile(tmpfd) != 0)
665684
ereport(PANIC,

src/backend/replication/logical/snapbuild.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
#include "miscadmin.h"
127127
#include "pgstat.h"
128128
#include "replication/logical.h"
129+
#include "replication/message.h"
129130
#include "replication/reorderbuffer.h"
130131
#include "replication/snapbuild.h"
131132
#include "storage/block.h" /* debugging output */
@@ -1604,6 +1605,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
16041605
int fd;
16051606
char tmppath[MAXPGPATH];
16061607
char path[MAXPGPATH];
1608+
char prefix[MAXPGPATH];
16071609
int ret;
16081610
struct stat stat_buf;
16091611
Size sz;
@@ -1726,6 +1728,10 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
17261728
(errcode_for_file_access(),
17271729
errmsg("could not open file \"%s\": %m", tmppath)));
17281730

1731+
/* NEON specific: persist snapshot in storage using logical message */
1732+
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
1733+
LogLogicalMessage(prefix, (char*)ondisk, needed_length, false);
1734+
17291735
errno = 0;
17301736
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
17311737
if ((write(fd, ondisk, needed_length)) != needed_length)
@@ -2032,6 +2038,7 @@ CheckPointSnapBuild(void)
20322038
DIR *snap_dir;
20332039
struct dirent *snap_de;
20342040
char path[MAXPGPATH + 21];
2041+
char prefix[MAXPGPATH + 31];
20352042

20362043
/*
20372044
* We start off with a minimum of the last redo pointer. No new
@@ -2090,6 +2097,10 @@ CheckPointSnapBuild(void)
20902097
{
20912098
elog(DEBUG1, "removing snapbuild snapshot %s", path);
20922099

2100+
/* NEON specific: delete file from storage using logical message */
2101+
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
2102+
LogLogicalMessage(prefix, NULL, 0, false);
2103+
20932104
/*
20942105
* It's not particularly harmful, though strange, if we can't
20952106
* remove the file here. Don't prevent the checkpoint from

src/backend/replication/slot.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "miscadmin.h"
4646
#include "pgstat.h"
4747
#include "replication/slot.h"
48+
#include "replication/message.h"
4849
#include "storage/fd.h"
4950
#include "storage/proc.h"
5051
#include "storage/procarray.h"
@@ -605,6 +606,15 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
605606
sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
606607
sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
607608

609+
if (SlotIsLogical(slot))
610+
{
611+
/* NEON specific: delete slot from storage using logical message */
612+
char prefix[MAXPGPATH];
613+
snprintf(prefix, sizeof(prefix), "neon-file:%s/state", path);
614+
elog(LOG, "Drop replication slot %s", path);
615+
LogLogicalMessage(prefix, NULL, 0, false);
616+
}
617+
608618
/*
609619
* Rename the slot directory on disk, so that we'll no longer recognize
610620
* this as a valid slot. Note that if this fails, we've got to mark the
@@ -1569,6 +1579,15 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
15691579
SnapBuildOnDiskChecksummedSize);
15701580
FIN_CRC32C(cp.checksum);
15711581

1582+
if (SlotIsLogical(slot) && cp.slotdata.restart_lsn != InvalidXLogRecPtr)
1583+
{
1584+
/* NEON specific: persist slot in storage using logical message */
1585+
char prefix[MAXPGPATH];
1586+
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
1587+
elog(LOG, "Save replication slot at %s restart_lsn=%X/%X", path, LSN_FORMAT_ARGS(cp.slotdata.restart_lsn));
1588+
LogLogicalMessage(prefix, (char*)&cp, sizeof cp, false);
1589+
}
1590+
15721591
errno = 0;
15731592
pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
15741593
if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))

0 commit comments

Comments
 (0)