Skip to content

Commit

Permalink
Adjust min,max times for dumps after job finishes (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
isacikgoz authored Jun 11, 2024
1 parent 0805a10 commit 35f711f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
41 changes: 31 additions & 10 deletions server/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@ import (
"github.com/prometheus/prometheus/tsdb"
)

func (p *Plugin) createDump(ctx context.Context, id string, min, max time.Time, remoteStorageDir string) (string, error) {
type Dump struct {
Path string
MinT int64
MaxT int64
}

func (p *Plugin) createDump(ctx context.Context, id string, min, max time.Time, remoteStorageDir string) (*Dump, error) {
// get the blocks if there is any block in the remote filestore
blocks, err := p.fileBackend.ListDirectory(remoteStorageDir)
if err != nil {
return "", err
return nil, err
} else if len(blocks) == 0 {
return "", errors.New("no blocks in the remote storage")
return nil, errors.New("no blocks in the remote storage")
}

// we generate everything under a new directory to avoid conflicts
// between simultaneous downloads
dumpDir := filepath.Join("dump", id, "data")
tempZipFile := filepath.Join(filepath.Dir(dumpDir), zipFileName)

var actualMin, actualMax time.Time

for _, b := range blocks {
// read block meta from the remote filestore and decide if they are older than the
// retention period. If they are within the retention period, copy the data
Expand All @@ -37,13 +45,22 @@ func (p *Plugin) createDump(ctx context.Context, id string, min, max time.Time,
}

metaMax := time.UnixMilli(meta.MaxTime)
metaMin := time.UnixMilli(meta.MinTime)
if metaMax.Before(max) && metaMax.After(min) {
p.API.LogInfo("Fetching block from the filestore", "ulid", meta.ULID, "Max Time", max.String())

err = copyFromFileStore(dumpDir, b, p.fileBackend)
if err != nil {
p.API.LogError("Error during fetching the block", "ulid", meta.ULID, "err", err)
}

if metaMax.After(actualMax) {
actualMax = metaMax
}

if metaMin.Before(actualMin) || actualMin.IsZero() {
actualMin = metaMin
}
}
}

Expand All @@ -55,37 +72,41 @@ func (p *Plugin) createDump(ctx context.Context, id string, min, max time.Time,
AllowOverlappingCompaction: true,
}, nil)
if err != nil {
return "", err
return nil, err
}

// we should compact the tsdb to remove/merge overlapping blocks. Also the older blocks
// will be deleted but we didn't pull them in the first place anyway.
err = db.Compact(ctx)
if err != nil {
return "", err
return nil, err
}

err = db.Close()
if err != nil {
return "", err
return nil, err
}

err = compressDirectory(dumpDir, tempZipFile)
if err != nil {
return "", err
return nil, err
}
defer os.Remove(tempZipFile)

err = os.RemoveAll(dumpDir)
if err != nil {
return "", err
return nil, err
}

zipFileNameRemote := filepath.Join(pluginDataDir, PluginName, tempZipFile)
err = copyFile(tempZipFile, zipFileNameRemote, p.fileBackend.WriteFile)
if err != nil {
return "", err
return nil, err
}

return zipFileNameRemote, nil
return &Dump{
Path: zipFileNameRemote,
MinT: actualMin.UnixMilli(),
MaxT: actualMax.UnixMilli(),
}, nil
}
4 changes: 3 additions & 1 deletion server/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func (p *Plugin) JobCallback(_ string, job any) {
return
}

dumpJob.DumpLocation = dump
dumpJob.DumpLocation = dump.Path
dumpJob.MinT = dump.MinT
dumpJob.MaxT = dump.MaxT
dumpJob.Status = model.JobStatusSuccess
}

Expand Down

0 comments on commit 35f711f

Please sign in to comment.