Skip to content

Commit 8d5b476

Browse files
author
Konstantin Knizhnik
committed
Persiste pgstat file in PS to include it in basebackup and so do not loose this information on compute restart
1 parent cf30276 commit 8d5b476

File tree

4 files changed

+59
-34
lines changed

4 files changed

+59
-34
lines changed

src/backend/access/heap/rewriteheap.c

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -785,36 +785,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
785785
* ------------------------------------------------------------------------
786786
*/
787787

788-
/*
789-
* NEON: we need to persist mapping file in WAL
790-
*/
791-
static void
792-
wallog_mapping_file(char const* path, int fd)
793-
{
794-
char prefix[MAXPGPATH];
795-
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
796-
if (fd < 0)
797-
{
798-
elog(DEBUG1, "neon: deleting contents of rewrite file %s", path);
799-
/* unlink file */
800-
LogLogicalMessage(prefix, NULL, 0, false);
801-
}
802-
else
803-
{
804-
off_t size = lseek(fd, 0, SEEK_END);
805-
char* buf;
806-
elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, (long)size);
807-
if (size < 0)
808-
elog(ERROR, "Failed to get size of mapping file: %m");
809-
buf = palloc((size_t)size);
810-
lseek(fd, 0, SEEK_SET);
811-
if (read(fd, buf, (size_t)size) != size)
812-
elog(ERROR, "Failed to read mapping file: %m");
813-
LogLogicalMessage(prefix, buf, (size_t)size, false);
814-
pfree(buf);
815-
}
816-
}
817-
818788
/*
819789
* Do preparations for logging logical mappings during a rewrite if
820790
* necessary. If we detect that we don't need to log anything we'll prevent
@@ -950,7 +920,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state)
950920
errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
951921
written, len)));
952922
src->off += len;
953-
wallog_mapping_file(src->path, FileGetRawDesc(src->vfd));
923+
wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd));
954924

955925
XLogBeginInsert();
956926
XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
@@ -1201,7 +1171,7 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
12011171
errmsg("could not fsync file \"%s\": %m", path)));
12021172
pgstat_report_wait_end();
12031173

1204-
wallog_mapping_file(path, fd);
1174+
wallog_file_descriptor(path, fd);
12051175

12061176
if (CloseTransientFile(fd) != 0)
12071177
ereport(ERROR,
@@ -1280,7 +1250,7 @@ CheckPointLogicalRewriteHeap(void)
12801250
ereport(ERROR,
12811251
(errcode_for_file_access(),
12821252
errmsg("could not remove file \"%s\": %m", path)));
1283-
wallog_mapping_file(path, -1);
1253+
wallog_file_descriptor(path, -1);
12841254
}
12851255
else
12861256
{
@@ -1309,7 +1279,7 @@ CheckPointLogicalRewriteHeap(void)
13091279
errmsg("could not fsync file \"%s\": %m", path)));
13101280
pgstat_report_wait_end();
13111281

1312-
wallog_mapping_file(path, fd);
1282+
wallog_file_descriptor(path, fd);
13131283

13141284
if (CloseTransientFile(fd) != 0)
13151285
ereport(ERROR,

src/backend/replication/logical/message.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
#include "postgres.h"
3333

34+
#include <unistd.h>
35+
3436
#include "access/xact.h"
3537
#include "access/xloginsert.h"
3638
#include "miscadmin.h"
@@ -87,3 +89,51 @@ logicalmsg_redo(XLogReaderState *record)
8789

8890
/* This is only interesting for logical decoding, see decode.c. */
8991
}
92+
93+
/*
94+
* NEON: persist file in WAL to save it in persistent storage.
95+
* If fd < 0, then remote entry from page server.
96+
*/
97+
void
98+
wallog_file_descriptor(char const* path, int fd)
99+
{
100+
char prefix[MAXPGPATH];
101+
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
102+
if (fd < 0)
103+
{
104+
elog(DEBUG1, "neon: deleting contents of rewrite file %s", path);
105+
/* unlink file */
106+
LogLogicalMessage(prefix, NULL, 0, false);
107+
}
108+
else
109+
{
110+
off_t size = lseek(fd, 0, SEEK_END);
111+
char* buf;
112+
elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, (long)size);
113+
if (size < 0)
114+
elog(ERROR, "Failed to get size of mapping file: %m");
115+
buf = palloc((size_t)size);
116+
lseek(fd, 0, SEEK_SET);
117+
if (read(fd, buf, (size_t)size) != size)
118+
elog(ERROR, "Failed to read mapping file: %m");
119+
LogLogicalMessage(prefix, buf, (size_t)size, false);
120+
pfree(buf);
121+
}
122+
}
123+
124+
void
125+
wallog_file(char const* path)
126+
{
127+
int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
128+
if (fd < 0)
129+
{
130+
ereport(LOG,
131+
(errcode_for_file_access(),
132+
errmsg("could not create file \"%s\": %m", path)));
133+
}
134+
else
135+
{
136+
wallog_file_descriptor(path, fd);
137+
CloseTransientFile(fd);
138+
}
139+
}

src/backend/utils/activity/pgstat.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
#include "lib/dshash.h"
9999
#include "pgstat.h"
100100
#include "port/atomics.h"
101+
#include "replication/message.h"
101102
#include "storage/fd.h"
102103
#include "storage/ipc.h"
103104
#include "storage/lwlock.h"
@@ -1462,6 +1463,7 @@ pgstat_write_statsfile(void)
14621463
tmpfile, statfile)));
14631464
unlink(tmpfile);
14641465
}
1466+
wallog_file(statfile);
14651467
}
14661468

14671469
/* helpers for pgstat_read_statsfile() */

src/include/replication/message.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ typedef struct xl_logical_message
3232
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
3333
size_t size, bool transactional);
3434

35+
extern void wallog_file(char const* path);
36+
extern void wallog_file_descriptor(char const* path, int fd);
37+
3538
/* RMGR API */
3639
#define XLOG_LOGICAL_MESSAGE 0x00
3740
extern void logicalmsg_redo(XLogReaderState *record);

0 commit comments

Comments
 (0)