Skip to content

Commit a9adf95

Browse files
committed
Wrong size of backup in list command if upload or download was break and resume, fix Altinity#526
Signed-off-by: Slach <bloodjazman@gmail.com>
1 parent f4e9367 commit a9adf95

File tree

5 files changed

+104
-61
lines changed

5 files changed

+104
-61
lines changed

ChangeLog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ IMPROVEMENTS
44
- switch S3 storage backend to https://github.com/aws/aws-sdk-go-v2/, fix [534](https://github.com/AlexAkulov/clickhouse-backup/issues/534)
55
- add `S3_OBJECT_LABLES` and `GCS_OBJECT_LABELS` to allow setup each backup object metadata during upload fix [588](https://github.com/AlexAkulov/clickhouse-backup/issues/588)
66

7+
BUG FIXES
8+
- Wrong size of backup in list command if upload or download was break and resume, fix [526](https://github.com/AlexAkulov/clickhouse-backup/issues/526)
9+
710
# v2.1.3
811
IMPROVEMENTS
912
- during upload sort tables descending by `total_bytes` if this field present

pkg/backup/download.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,9 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
324324
}
325325
var tableMetadata metadata.TableMetadata
326326
for remoteMetadataFile, localMetadataFile := range metadataFiles {
327-
if b.resume && b.resumableState.IsAlreadyProcessed(localMetadataFile) {
328-
if strings.HasSuffix(localMetadataFile, ".json") {
327+
if b.resume {
328+
isProcesses, processedSize := b.resumableState.IsAlreadyProcessed(localMetadataFile)
329+
if isProcesses && strings.HasSuffix(localMetadataFile, ".json") {
329330
tmBody, err := os.ReadFile(localMetadataFile)
330331
if err != nil {
331332
return nil, 0, err
@@ -335,7 +336,10 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
335336
}
336337
filterPartsByPartitionsFilter(tableMetadata, partitionsFilter)
337338
}
338-
continue
339+
if isProcesses {
340+
size += uint64(processedSize)
341+
continue
342+
}
339343
}
340344
var tmBody []byte
341345
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
@@ -361,13 +365,15 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
361365
if err = os.MkdirAll(path.Dir(localMetadataFile), 0755); err != nil {
362366
return nil, 0, err
363367
}
368+
var written int64
364369
if strings.HasSuffix(localMetadataFile, ".sql") {
365370
if err = os.WriteFile(localMetadataFile, tmBody, 0640); err != nil {
366371
return nil, 0, err
367372
}
368373
if err = filesystemhelper.Chown(localMetadataFile, b.ch, disks, false); err != nil {
369374
return nil, 0, err
370375
}
376+
written = int64(len(tmBody))
371377
size += uint64(len(tmBody))
372378
} else {
373379
if err = json.Unmarshal(tmBody, &tableMetadata); err != nil {
@@ -380,10 +386,11 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
380386
if err != nil {
381387
return nil, 0, err
382388
}
389+
written = int64(jsonSize)
383390
size += jsonSize
384391
}
385392
if b.resume {
386-
b.resumableState.AppendToState(localMetadataFile)
393+
b.resumableState.AppendToState(localMetadataFile, written)
387394
}
388395
}
389396
log.
@@ -405,8 +412,10 @@ func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup st
405412
log := b.log.WithField("logger", "downloadBackupRelatedDir")
406413
archiveFile := fmt.Sprintf("%s.%s", prefix, b.cfg.GetArchiveExtension())
407414
remoteFile := path.Join(remoteBackup.BackupName, archiveFile)
408-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteFile) {
409-
return 0, nil
415+
if b.resume {
416+
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed {
417+
return uint64(processedSize), nil
418+
}
410419
}
411420
localDir := path.Join(b.DefaultDataPath, "backup", remoteBackup.BackupName, prefix)
412421
remoteFileInfo, err := b.dst.StatFile(ctx, remoteFile)
@@ -422,7 +431,7 @@ func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup st
422431
return 0, err
423432
}
424433
if b.resume {
425-
b.resumableState.AppendToState(remoteFile)
434+
b.resumableState.AppendToState(remoteFile, remoteFileInfo.Size())
426435
}
427436
return uint64(remoteFileInfo.Size()), nil
428437
}
@@ -459,7 +468,7 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
459468
g.Go(func() error {
460469
defer s.Release(1)
461470
log.Debugf("start download %s", tableRemoteFile)
462-
if b.resume && b.resumableState.IsAlreadyProcessed(tableRemoteFile) {
471+
if b.resume && b.resumableState.IsAlreadyProcessedBool(tableRemoteFile) {
463472
return nil
464473
}
465474
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
@@ -470,7 +479,7 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
470479
return err
471480
}
472481
if b.resume {
473-
b.resumableState.AppendToState(tableRemoteFile)
482+
b.resumableState.AppendToState(tableRemoteFile, 0)
474483
}
475484
log.Debugf("finish download %s", tableRemoteFile)
476485
return nil
@@ -505,14 +514,14 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
505514
g.Go(func() error {
506515
defer s.Release(1)
507516
log.Debugf("start %s -> %s", partRemotePath, partLocalPath)
508-
if b.resume && b.resumableState.IsAlreadyProcessed(partRemotePath) {
517+
if b.resume && b.resumableState.IsAlreadyProcessedBool(partRemotePath) {
509518
return nil
510519
}
511520
if err := b.dst.DownloadPath(dataCtx, 0, partRemotePath, partLocalPath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration); err != nil {
512521
return err
513522
}
514523
if b.resume {
515-
b.resumableState.AppendToState(partRemotePath)
524+
b.resumableState.AppendToState(partRemotePath, 0)
516525
}
517526
log.Debugf("finish %s -> %s", partRemotePath, partLocalPath)
518527
return nil
@@ -600,12 +609,12 @@ breakByError:
600609
return fmt.Errorf("can't to add link to exists part %s -> %s error: %v", newPath, existsPath, err)
601610
}
602611
if b.resume {
603-
b.resumableState.AppendToState(existsPath)
612+
b.resumableState.AppendToState(existsPath, 0)
604613
}
605614
return nil
606615
})
607616
} else {
608-
if !b.resume || (b.resume && !b.resumableState.IsAlreadyProcessed(existsPath)) {
617+
if !b.resume || (b.resume && !b.resumableState.IsAlreadyProcessedBool(existsPath)) {
609618
if err = b.makePartHardlinks(existsPath, newPath); err != nil {
610619
return fmt.Errorf("can't to add exists part: %v", err)
611620
}
@@ -859,7 +868,7 @@ func (b *Backuper) makePartHardlinks(exists, new string) error {
859868
}
860869

861870
func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile string, localFile string, disks []clickhouse.Disk) error {
862-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteFile) {
871+
if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) {
863872
return nil
864873
}
865874
log := b.log.WithField("logger", "downloadSingleBackupFile")
@@ -901,7 +910,7 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri
901910
return err
902911
}
903912
if b.resume {
904-
b.resumableState.AppendToState(remoteFile)
913+
b.resumableState.AppendToState(remoteFile, 0)
905914
}
906915
return nil
907916
}

pkg/backup/upload.go

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
201201
return err
202202
}
203203
remoteBackupMetaFile := path.Join(backupName, "metadata.json")
204-
if !b.resume || (b.resume && !b.resumableState.IsAlreadyProcessed(remoteBackupMetaFile)) {
204+
if !b.resume || (b.resume && !b.resumableState.IsAlreadyProcessedBool(remoteBackupMetaFile)) {
205205
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
206206
err = retry.RunCtx(ctx, func(ctx context.Context) error {
207207
return b.dst.PutFile(ctx, remoteBackupMetaFile, io.NopCloser(bytes.NewReader(newBackupMetadataBody)))
@@ -210,7 +210,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
210210
return fmt.Errorf("can't upload %s: %v", remoteBackupMetaFile, err)
211211
}
212212
if b.resume {
213-
b.resumableState.AppendToState(remoteBackupMetaFile)
213+
b.resumableState.AppendToState(remoteBackupMetaFile, int64(len(newBackupMetadataBody)))
214214
}
215215
}
216216
if b.isEmbedded {
@@ -236,7 +236,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
236236
}
237237

238238
func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remoteFile string) error {
239-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteFile) {
239+
if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) {
240240
return nil
241241
}
242242
log := b.log.WithField("logger", "uploadSingleBackupFile")
@@ -257,7 +257,11 @@ func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remote
257257
return fmt.Errorf("can't upload %s: %v", remoteFile, err)
258258
}
259259
if b.resume {
260-
b.resumableState.AppendToState(remoteFile)
260+
info, err := os.Stat(localFile)
261+
if err != nil {
262+
return fmt.Errorf("can't stat %s", localFile)
263+
}
264+
b.resumableState.AppendToState(remoteFile, info.Size())
261265
}
262266
return nil
263267
}
@@ -385,8 +389,10 @@ func (b *Backuper) uploadAndArchiveBackupRelatedDir(ctx context.Context, localBa
385389
if _, err := os.Stat(localBackupRelatedDir); os.IsNotExist(err) {
386390
return 0, nil
387391
}
388-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteFile) {
389-
return 0, nil
392+
if b.resume {
393+
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed {
394+
return uint64(processedSize), nil
395+
}
390396
}
391397
var localFiles []string
392398
var err error
@@ -410,7 +416,7 @@ func (b *Backuper) uploadAndArchiveBackupRelatedDir(ctx context.Context, localBa
410416
return 0, fmt.Errorf("can't check uploaded %s file: %v", remoteFile, err)
411417
}
412418
if b.resume {
413-
b.resumableState.AppendToState(remoteFile)
419+
b.resumableState.AppendToState(remoteFile, remoteUploaded.Size())
414420
}
415421
return uint64(remoteUploaded.Size()), nil
416422
}
@@ -462,16 +468,21 @@ breakByError:
462468
remotePathFull := path.Join(remotePath, partSuffix)
463469
g.Go(func() error {
464470
defer s.Release(1)
465-
if b.resume && b.resumableState.IsAlreadyProcessed(remotePathFull) {
466-
return nil
471+
if b.resume {
472+
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remotePathFull); isProcessed {
473+
atomic.AddInt64(&uploadedBytes, processedSize)
474+
return nil
475+
}
467476
}
468477
log.Debugf("start upload %d files to %s", len(partFiles), remotePath)
469-
if err := b.dst.UploadPath(ctx, 0, backupPath, partFiles, remotePath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration); err != nil {
478+
if uploadPathBytes, err := b.dst.UploadPath(ctx, 0, backupPath, partFiles, remotePath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration); err != nil {
470479
log.Errorf("UploadPath return error: %v", err)
471480
return fmt.Errorf("can't upload: %v", err)
472-
}
473-
if b.resume {
474-
b.resumableState.AppendToState(remotePathFull)
481+
} else {
482+
atomic.AddInt64(&uploadedBytes, uploadPathBytes)
483+
if b.resume {
484+
b.resumableState.AppendToState(remotePathFull, uploadPathBytes)
485+
}
475486
}
476487
log.Debugf("finish upload %d files to %s", len(partFiles), remotePath)
477488
return nil
@@ -483,8 +494,11 @@ breakByError:
483494
localFiles := partFiles
484495
g.Go(func() error {
485496
defer s.Release(1)
486-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteDataFile) {
487-
return nil
497+
if b.resume {
498+
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteDataFile); isProcessed {
499+
atomic.AddInt64(&uploadedBytes, processedSize)
500+
return nil
501+
}
488502
}
489503
log.Debugf("start upload %d files to %s", len(localFiles), remoteDataFile)
490504
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
@@ -501,7 +515,7 @@ breakByError:
501515
}
502516
atomic.AddInt64(&uploadedBytes, remoteFile.Size())
503517
if b.resume {
504-
b.resumableState.AppendToState(remoteDataFile)
518+
b.resumableState.AppendToState(remoteDataFile, remoteFile.Size())
505519
}
506520
log.Debugf("finish upload to %s", remoteDataFile)
507521
return nil
@@ -534,8 +548,10 @@ func (b *Backuper) uploadTableMetadataRegular(ctx context.Context, backupName st
534548
return 0, fmt.Errorf("can't marshal json: %v", err)
535549
}
536550
remoteTableMetaFile := path.Join(backupName, "metadata", common.TablePathEncode(tableMetadata.Database), fmt.Sprintf("%s.json", common.TablePathEncode(tableMetadata.Table)))
537-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteTableMetaFile) {
538-
return int64(len(content)), nil
551+
if b.resume {
552+
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteTableMetaFile); isProcessed {
553+
return processedSize, nil
554+
}
539555
}
540556
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
541557
err = retry.RunCtx(ctx, func(ctx context.Context) error {
@@ -545,15 +561,17 @@ func (b *Backuper) uploadTableMetadataRegular(ctx context.Context, backupName st
545561
return 0, fmt.Errorf("can't upload: %v", err)
546562
}
547563
if b.resume {
548-
b.resumableState.AppendToState(remoteTableMetaFile)
564+
b.resumableState.AppendToState(remoteTableMetaFile, int64(len(content)))
549565
}
550566
return int64(len(content)), nil
551567
}
552568

553569
func (b *Backuper) uploadTableMetadataEmbedded(ctx context.Context, backupName string, tableMetadata metadata.TableMetadata) (int64, error) {
554570
remoteTableMetaFile := path.Join(backupName, "metadata", common.TablePathEncode(tableMetadata.Database), fmt.Sprintf("%s.sql", common.TablePathEncode(tableMetadata.Table)))
555-
if b.resume && b.resumableState.IsAlreadyProcessed(remoteTableMetaFile) {
556-
return 0, nil
571+
if b.resume {
572+
if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteTableMetaFile); isProcessed {
573+
return processedSize, nil
574+
}
557575
}
558576
log := b.log.WithField("logger", "uploadTableMetadataEmbedded")
559577
localTableMetaFile := path.Join(b.EmbeddedBackupDataPath, backupName, "metadata", common.TablePathEncode(tableMetadata.Database), fmt.Sprintf("%s.sql", common.TablePathEncode(tableMetadata.Table)))
@@ -573,12 +591,12 @@ func (b *Backuper) uploadTableMetadataEmbedded(ctx context.Context, backupName s
573591
if err != nil {
574592
return 0, fmt.Errorf("can't embeeded upload metadata: %v", err)
575593
}
576-
if b.resume {
577-
b.resumableState.AppendToState(remoteTableMetaFile)
578-
}
579594
if info, err := os.Stat(localTableMetaFile); err != nil {
580595
return 0, fmt.Errorf("stat %s error: %v", localTableMetaFile, err)
581596
} else {
597+
if b.resume {
598+
b.resumableState.AppendToState(remoteTableMetaFile, info.Size())
599+
}
582600
return info.Size(), nil
583601
}
584602
}

pkg/resumable/state.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
apexLog "github.com/apex/log"
66
"os"
77
"path"
8+
"strconv"
89
"strings"
910
"sync"
1011
)
@@ -49,7 +50,8 @@ func (s *State) LoadState() {
4950
s.mx.Unlock()
5051
}
5152

52-
func (s *State) AppendToState(path string) {
53+
func (s *State) AppendToState(path string, size int64) {
54+
path = fmt.Sprintf("%s:%d", path, size)
5355
s.mx.Lock()
5456
if s.fp != nil {
5557
_, err := s.fp.WriteString(path + "\n")
@@ -65,14 +67,25 @@ func (s *State) AppendToState(path string) {
6567
s.mx.Unlock()
6668
}
6769

68-
func (s *State) IsAlreadyProcessed(path string) bool {
70+
func (s *State) IsAlreadyProcessedBool(path string) bool {
71+
isProcesses, _ := s.IsAlreadyProcessed(path)
72+
return isProcesses
73+
}
74+
func (s *State) IsAlreadyProcessed(path string) (bool, int64) {
75+
var size int64
76+
var err error
6977
s.mx.RLock()
70-
res := strings.Contains(s.currentState, path+"\n")
71-
s.mx.RUnlock()
72-
if res {
78+
res := strings.Index(s.currentState, path+":")
79+
if res >= 0 {
7380
s.log.Infof("%s already processed", path)
81+
sSize := s.currentState[res : res+strings.Index(s.currentState[res:], "\n")]
82+
size, err = strconv.ParseInt(sSize, 10, 64)
83+
if err != nil {
84+
s.log.Warnf("invalid size %s in upload state")
85+
}
7486
}
75-
return res
87+
s.mx.RUnlock()
88+
return res >= 0, size
7689
}
7790

7891
func (s *State) Close() {

0 commit comments

Comments
 (0)