Skip to content
Draft
Prev Previous commit
Next Next commit
[Issue #449] fix stream backups, PAGE backup, WAL parsing, delete and…
… validate
  • Loading branch information
gsmolk committed Oct 31, 2021
commit d4f9aa36c1b1e39faa4a9c54cf0e62cccf2b0f50
13 changes: 7 additions & 6 deletions src/backup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
{
XLogSegNo targetSegNo;
char wal_segment_path[MAXPGPATH],
wal_segment_subdir[MAXPGPATH],
wal_segment_subdir[MAXPGPATH], // used only to check file existence, not actual parsing
wal_segment[MAXFNAMELEN];
bool file_exists = false;
uint32 try_count = 0,
Expand All @@ -1257,10 +1257,10 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
instance_config.xlog_seg_size);

// obtain WAL archive subdir for ARCHIVE backup
if (!in_stream_dir)
get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
else
if (in_stream_dir)
strcpy(wal_segment_subdir, wal_segment_dir);
else
get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);

join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
/*
Expand Down Expand Up @@ -1319,7 +1319,7 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
*/
if (!XRecOffIsNull(target_lsn) &&
wal_contains_lsn(wal_segment_dir, target_lsn, tli,
instance_config.xlog_seg_size))
instance_config.xlog_seg_size, !in_stream_dir))
/* Target LSN was found */
{
elog(LOG, "Found LSN: %X/%X", (uint32) (target_lsn >> 32), (uint32) target_lsn);
Expand Down Expand Up @@ -1915,7 +1915,8 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
if (!read_recovery_info(xlog_path, backup->tli,
instance_config.xlog_seg_size,
backup->start_lsn, backup->stop_lsn,
&backup->recovery_time))
&backup->recovery_time,
!backup->stream))
{
elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp");
backup->recovery_time = stop_backup_result.invocation_time;
Expand Down
8 changes: 6 additions & 2 deletions src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
else if (strcmp(suffix, "gz") != 0)
{
elog(WARNING, "unexpected WAL file name \"%s\"", file->name);
// TODO: free file
pgFileFree(file);
parray_remove(xlog_files_list, i);
i--;
continue;
}
}
Expand Down Expand Up @@ -1735,7 +1737,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
else
{
elog(WARNING, "unexpected WAL file name \"%s\"", file->name);
// TODO: free file
pgFileFree(file);
parray_remove(xlog_files_list, i);
i--;
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/delete.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,9 +970,12 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
}

wal_deleted = true;
}

//TODO: cleanup
/* cleanup */
pgXlogFileFree(wal_file);
parray_remove(tlinfo->xlog_filelist, i);
i--;
}
}

/* Remove empty subdirectories */
Expand All @@ -987,10 +990,12 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);

if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
{
pgFileDelete(file->mode, fullpath);

pgFileFree(file);
// TODO: maintain instanceState->wal_archive_subdirs
pgFileFree(file);
parray_remove(instanceState->wal_archive_subdirs, i);
i--;
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/dir.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,19 @@ pgFileFree(void *file)
pfree(file);
}

void
pgXlogFileFree(void *xlogfile)
{
xlogFile *xlogfile_ptr;

if (xlogfile == NULL)
return;

xlogfile_ptr = (xlogFile *) xlogfile;

pg_free(xlogfile_ptr);
}

/* Compare two pgFile with their path in ascending order of ASCII code. */
int
pgFileMapComparePath(const void *f1, const void *f2)
Expand Down
134 changes: 107 additions & 27 deletions src/parsexlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ typedef struct XLogReaderData
gzFile gz_xlogfile;
char gz_xlogpath[MAXPGPATH];
#endif
bool is_stream;
bool honor_subdirs;
} XLogReaderData;

/* Function to process a WAL record */
Expand Down Expand Up @@ -174,7 +174,7 @@ static bool RunXLogThreads(const char *archivedir,
xlog_record_function process_record,
XLogRecTarget *last_rec,
bool inclusive_endpoint,
bool is_stream);
bool honor_subdirs);
//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
static bool SwitchThreadToNextWal(XLogReaderState *xlogreader,
xlog_thread_arg *arg);
Expand Down Expand Up @@ -256,7 +256,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, end_tli, wal_seg_size,
startpoint, endpoint, false, extractPageInfo,
NULL, true, false);
NULL, true, true);
else
{
/* We have to process WAL located on several different xlog intervals,
Expand Down Expand Up @@ -350,7 +350,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tmp_interval->tli, wal_seg_size,
tmp_interval->begin_lsn, tmp_interval->end_lsn,
false, extractPageInfo, NULL, inclusive_endpoint, false);
false, extractPageInfo, NULL, inclusive_endpoint, true);
if (!extract_isok)
break;

Expand Down Expand Up @@ -379,7 +379,7 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup,
got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tli, xlog_seg_size,
backup->start_lsn, backup->stop_lsn,
false, NULL, NULL, true, backup->stream);
false, NULL, NULL, true, !backup->stream);

if (!got_endpoint)
{
Expand Down Expand Up @@ -452,6 +452,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
elog(WARNING, "Backup %s WAL segments are corrupted", backup_id);
return;
}

/*
* If recovery target is provided check that we can restore backup to a
* recovery target time or xid.
Expand Down Expand Up @@ -493,7 +494,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
RunXLogThreads(archivedir, target_time, target_xid, target_lsn,
tli, wal_seg_size, backup->stop_lsn,
InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true,
backup->stream);
true);
if (last_rec.rec_time > 0)
time2iso(last_timestamp, lengthof(last_timestamp),
timestamptz_to_time_t(last_rec.rec_time), false);
Expand Down Expand Up @@ -535,7 +536,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
bool
read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
XLogRecPtr start_lsn, XLogRecPtr stop_lsn,
time_t *recovery_time)
time_t *recovery_time, bool honor_subdirs)
{
XLogRecPtr startpoint = stop_lsn;
XLogReaderState *xlogreader;
Expand All @@ -552,6 +553,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,

xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
false, true, true);
reader_data.honor_subdirs = honor_subdirs;

/* Read records from stop_lsn down to start_lsn */
do
Expand Down Expand Up @@ -611,7 +613,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
*/
bool
wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
TimeLineID target_tli, uint32 wal_seg_size)
TimeLineID target_tli, uint32 wal_seg_size, bool honor_subdirs)
{
XLogReaderState *xlogreader;
XLogReaderData reader_data;
Expand All @@ -629,6 +631,7 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
elog(ERROR, "Out of memory");

xlogreader->system_identifier = instance_config.system_identifier;
reader_data.honor_subdirs = honor_subdirs;

#if PG_VERSION_NUM >= 130000
if (XLogRecPtrIsInvalid(target_lsn))
Expand Down Expand Up @@ -1015,38 +1018,114 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
/* Try to switch to the next WAL segment */
if (!reader_data->xlogexists)
{
char xlogfname[MAXFNAMELEN];
char partial_file[MAXPGPATH];
bool compressed = false;
char xlogfname[MAXFNAMELEN];
// char partial_file[MAXPGPATH];
char fullpath[MAXPGPATH];
char fullpath_gz[MAXPGPATH];
char fullpath_partial_gz[MAXPGPATH];

GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size);

if (reader_data->is_stream)
join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname);
/* obtain WAL archive subdir for ARCHIVE backup */
else
if (reader_data->honor_subdirs)
{
char archive_subdir[MAXPGPATH];
get_archive_subdir(archive_subdir, wal_archivedir, xlogfname, SEGMENT);
join_path_components(reader_data->xlogpath, archive_subdir, xlogfname);

/* check existence of wal_dir/xlogid/segment.gz file ... */
snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", archive_subdir, xlogfname);

//TODO: rewrite it to something less ugly
#ifdef HAVE_LIBZ
if (fileExists(fullpath_gz, FIO_LOCAL_HOST))
{
snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname);
snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_gz);
compressed = true;
goto file_found;
}

/* ... failing that check existence of wal_dir/xlogid/segment.partial.gz ... */
snprintf(fullpath_partial_gz, MAXPGPATH, "%s/%s.partial.gz", archive_subdir, xlogfname);
if (fileExists(fullpath_partial_gz, FIO_LOCAL_HOST))
{
snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s.partial", archive_subdir, xlogfname);
snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_partial_gz);
compressed = true;
goto file_found;
}
#endif
/* ... failing that check existence of wal_dir/xlogid/segment ... */
snprintf(fullpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname);
if (fileExists(fullpath, FIO_LOCAL_HOST))
{
snprintf(reader_data->xlogpath, MAXPGPATH, "%s", fullpath);
goto file_found;
}

goto archive_dir;
}
/* use directory as-is */
else
{
archive_dir:
#ifdef HAVE_LIBZ
/* ... failing that check existence of wal_dir/segment.gz ... */
snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", wal_archivedir, xlogfname);
if (fileExists(fullpath_gz, FIO_LOCAL_HOST))
{
snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_gz);
snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname);
compressed = true;

snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
goto file_found;
}

/* ... failing that check existence of wal_dir/segment.partial.gz ... */
snprintf(fullpath_partial_gz, MAXPGPATH, "%s/%s.partial.gz", wal_archivedir, xlogfname);
if (fileExists(wal_archivedir, FIO_LOCAL_HOST))
{
snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s.partial", wal_archivedir, xlogfname);
snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_partial_gz);
compressed = true;
goto file_found;
}
#endif
/* ... failing that check existence of wal_dir/segment ... */
snprintf(fullpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname);
if (fileExists(fullpath, FIO_LOCAL_HOST))
{
snprintf(reader_data->xlogpath, MAXPGPATH, "%s", fullpath);
goto file_found;
}
}

file_found:
canonicalize_path(reader_data->xlogpath);

#ifdef HAVE_LIBZ
if (compressed)
canonicalize_path(reader_data->gz_xlogpath);
#endif

// snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);

/* We fall back to using .partial segment in case if we are running
* multi-timeline incremental backup right after standby promotion.
* TODO: it should be explicitly enabled.
*/
snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
// snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);

/* If segment do not exists, but the same
* segment with '.partial' suffix does, use it instead */
if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
fileExists(partial_file, FIO_LOCAL_HOST))
{
snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
}
// if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
// fileExists(partial_file, FIO_LOCAL_HOST))
// {
// snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
// }

if (fileExists(reader_data->xlogpath, FIO_LOCAL_HOST))
if (!compressed)
{
elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
reader_data->thread_num, reader_data->xlogpath);
Expand All @@ -1065,7 +1144,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
}
#ifdef HAVE_LIBZ
/* Try to open compressed WAL segment */
else if (fileExists(reader_data->gz_xlogpath, FIO_LOCAL_HOST))
else
{
elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
reader_data->thread_num, reader_data->gz_xlogpath);
Expand Down Expand Up @@ -1203,7 +1282,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli,
uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint,
bool consistent_read, xlog_record_function process_record,
XLogRecTarget *last_rec, bool inclusive_endpoint, bool is_stream)
XLogRecTarget *last_rec, bool inclusive_endpoint, bool honor_subdirs)
{
pthread_t *threads;
xlog_thread_arg *thread_args;
Expand Down Expand Up @@ -1267,7 +1346,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
consistent_read, false);
arg->reader_data.xlogsegno = segno_next;
arg->reader_data.thread_num = i + 1;
arg->reader_data.is_stream = is_stream;
arg->reader_data.honor_subdirs = honor_subdirs;
arg->process_record = process_record;
arg->startpoint = startpoint;
arg->endpoint = endpoint;
Expand Down Expand Up @@ -1495,7 +1574,7 @@ XLogThreadWorker(void *arg)
reader_data->thread_num,
(uint32) (errptr >> 32), (uint32) (errptr));

/* In we failed to read record located at endpoint position,
/* If we failed to read record located at endpoint position,
* and endpoint is not inclusive, do not consider this as an error.
*/
if (!thread_arg->inclusive_endpoint &&
Expand All @@ -1522,6 +1601,7 @@ XLogThreadWorker(void *arg)

if (thread_arg->process_record)
thread_arg->process_record(xlogreader, reader_data, &stop_reading);

if (stop_reading)
{
thread_arg->got_target = true;
Expand Down Expand Up @@ -1928,7 +2008,7 @@ bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_

rc = RunXLogThreads(prefetch_dir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tli, wal_seg_size,
startpoint, endpoint, false, NULL, NULL, true, true);
startpoint, endpoint, false, NULL, NULL, true, false);

num_threads = tmp_num_threads;

Expand Down
Loading