Skip to content

Commit 672c7ea

Browse files
authored
Limit LTX compaction during backups (#402)
1 parent 4b77ba1 commit 672c7ea

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

store.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ const (
4545
)
4646

4747
const (
48+
// MaxBackupLTXFileN is the number of LTX files that can be compacted
49+
// together at a time when sending data to the backup service.
50+
MaxBackupLTXFileN = 256
51+
4852
MetricsMonitorInterval = 1 * time.Second
4953
)
5054

@@ -1120,9 +1124,9 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11201124

11211125
// Check local replication position.
11221126
// If we haven't written anything yet then try to send data.
1123-
pos := db.Pos()
1124-
if pos.IsZero() {
1125-
return pos, nil
1127+
localPos := db.Pos()
1128+
if localPos.IsZero() {
1129+
return localPos, nil
11261130
}
11271131

11281132
// If the database doesn't exist remotely, perform a full snapshot.
@@ -1132,11 +1136,11 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11321136

11331137
// If the position from the backup server is ahead of the primary then we
11341138
// need to perform a recovery so that we snapshot from the backup server.
1135-
if remotePos.TXID > pos.TXID {
1139+
if remotePos.TXID > localPos.TXID {
11361140
slog.Warn("restoring from backup",
11371141
slog.String("name", name),
11381142
slog.Group("pos",
1139-
slog.String("local", pos.String()),
1143+
slog.String("local", localPos.String()),
11401144
slog.String("remote", remotePos.String()),
11411145
),
11421146
slog.String("reason", "remote-ahead"),
@@ -1148,12 +1152,12 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11481152
// If the TXID matches the backup server, we need to ensure the checksum
11491153
// does as well. If it doesn't, we need to grab a snapshot from the backup
11501154
// server. If it does, then we can exit as we're already in sync.
1151-
if remotePos.TXID == pos.TXID {
1152-
if remotePos.PostApplyChecksum != pos.PostApplyChecksum {
1155+
if remotePos.TXID == localPos.TXID {
1156+
if remotePos.PostApplyChecksum != localPos.PostApplyChecksum {
11531157
slog.Warn("restoring from backup",
11541158
slog.String("name", name),
11551159
slog.Group("pos",
1156-
slog.String("local", pos.String()),
1160+
slog.String("local", localPos.String()),
11571161
slog.String("remote", remotePos.String()),
11581162
),
11591163
slog.String("reason", "chksum-mismatch"),
@@ -1162,10 +1166,10 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11621166
}
11631167

11641168
slog.Debug("database in sync with backup, skipping", slog.String("name", name))
1165-
return pos, nil // already in sync
1169+
return localPos, nil // already in sync
11661170
}
11671171

1168-
assert(remotePos.TXID < pos.TXID, "remote/local position must be ordered")
1172+
assert(remotePos.TXID < localPos.TXID, "remote/local position must be ordered")
11691173

11701174
// OPTIMIZE: Check that remote postApplyChecksum equals next TXID's preApplyChecksum
11711175

@@ -1176,13 +1180,14 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11761180
_ = r.(io.Closer).Close()
11771181
}
11781182
}()
1179-
for txID := remotePos.TXID + 1; txID <= pos.TXID; txID++ {
1183+
1184+
for txID, n := remotePos.TXID+1, 0; txID <= localPos.TXID && n < MaxBackupLTXFileN; txID, n = txID+1, n+1 {
11801185
f, err := db.OpenLTXFile(txID)
11811186
if os.IsNotExist(err) {
11821187
slog.Warn("restoring from backup",
11831188
slog.String("name", name),
11841189
slog.Group("pos",
1185-
slog.String("local", pos.String()),
1190+
slog.String("local", localPos.String()),
11861191
slog.String("remote", remotePos.String()),
11871192
),
11881193
slog.String("txid", txID.String()),
@@ -1197,10 +1202,17 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
11971202

11981203
// Compact LTX files through a pipe so we can pass it to the backup client.
11991204
pr, pw := io.Pipe()
1205+
var pos ltx.Pos
12001206
go func() {
12011207
compactor := ltx.NewCompactor(pw, rdrs)
12021208
compactor.HeaderFlags = s.ltxHeaderFlags()
1203-
_ = pw.CloseWithError(compactor.Compact(ctx))
1209+
if err := compactor.Compact(ctx); err != nil {
1210+
_ = pw.CloseWithError(err)
1211+
return
1212+
}
1213+
1214+
pos = ltx.NewPos(compactor.Header().MaxTXID, compactor.Trailer().PostApplyChecksum)
1215+
_ = pw.Close()
12041216
}()
12051217

12061218
var pmErr *ltx.PosMismatchError
@@ -1209,7 +1221,7 @@ func (s *Store) streamBackupDB(ctx context.Context, name string, remotePos ltx.P
12091221
slog.Warn("restoring from backup",
12101222
slog.String("name", name),
12111223
slog.Group("pos",
1212-
slog.String("local", pos.String()),
1224+
slog.String("local", localPos.String()),
12131225
slog.String("remote", pmErr.Pos.String()),
12141226
),
12151227
slog.String("reason", "out-of-sync"),

0 commit comments

Comments
 (0)