Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

always retry syncing index files from indexshipper when the index list cache is stale #6327

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions pkg/storage/stores/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package downloads

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -25,7 +26,8 @@ import (
)

const (
gzipExtension = ".gz"
gzipExtension = ".gz"
maxSyncRetries = 1
)

var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")
Expand Down Expand Up @@ -144,7 +146,7 @@ func (t *indexSet) Init(forQuerying bool) (err error) {
level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.index)))

// sync the table to get new files and remove the deleted ones from storage.
err = t.sync(ctx, false, forQuerying)
err = t.syncWithRetry(ctx, false, forQuerying)
if err != nil {
return
}
Expand Down Expand Up @@ -243,11 +245,31 @@ func (t *indexSet) cleanupDB(fileName string) error {
}

func (t *indexSet) Sync(ctx context.Context) (err error) {
return t.sync(ctx, true, false)
return t.syncWithRetry(ctx, true, false)
}

// syncWithRetry runs a sync with upto maxSyncRetries on failure
func (t *indexSet) syncWithRetry(ctx context.Context, lock, bypassListCache bool) error {
var err error
for i := 0; i <= maxSyncRetries; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checking: the PR title says that it will retry syncing files when the index list cache is stale, but on this implementation you are retrying regardless of the error (although you're only refreshing when the cache is stale). Is that the intended behavior?

err = t.sync(ctx, lock, bypassListCache)
if err == nil {
return nil
}

if errors.Is(err, errIndexListCacheTooStale) && i < maxSyncRetries {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it before retrying")
t.baseIndexSet.RefreshIndexListCache(ctx)
}

level.Error(t.logger).Log("msg", "sync failed, retrying it", "err", err)
}

return err
}

// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool, bypassListCache bool) (err error) {
func (t *indexSet) sync(ctx context.Context, lock, bypassListCache bool) (err error) {
level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.tableName))

toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache)
Expand Down
65 changes: 65 additions & 0 deletions pkg/storage/stores/indexshipper/downloads/index_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package downloads
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
Expand Down Expand Up @@ -89,3 +91,66 @@ func TestIndexSet_doConcurrentDownload(t *testing.T) {
})
}
}

func TestIndexSet_Sync(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)

var indexesSetup []string

indexSet, stopFunc := buildTestIndexSet(t, "", tempDir)
defer stopFunc()

checkIndexSet := func() {
require.Len(t, indexSet.index, len(indexesSetup))
verifyIndexForEach(t, indexesSetup, func(callbackFunc func(index.Index) error) error {
return indexSet.ForEach(context.Background(), callbackFunc)
})
}

// setup some indexes in object storage
setupIndexesAtPath(t, "", tablePathInStorage, 0, 10)
indexesSetup = buildListOfExpectedIndexes("", 0, 10)

// sync and verify the indexSet
indexSet.baseIndexSet.RefreshIndexListCache(context.Background())
require.NoError(t, indexSet.Sync(context.Background()))

// check index set twice; first run to have new files to download, second run to test with no changes in storage.
for i := 0; i < 2; i++ {
checkIndexSet()
}

// delete a file from storage which should get removed from local as well
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, indexesSetup[0])))
indexesSetup = indexesSetup[1:]

// sync and verify the indexSet
indexSet.baseIndexSet.RefreshIndexListCache(context.Background())
require.NoError(t, indexSet.Sync(context.Background()))
checkIndexSet()

// let us simulate a compaction to test stale index list cache handling

// first, let us add a new file and refresh the index list cache
oneMoreDB := "one-more-db"
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755))
indexSet.baseIndexSet.RefreshIndexListCache(context.Background())

// now, without syncing the indexset, let us compact the index in storage
compactedDBName := "compacted-db"
require.NoError(t, os.RemoveAll(tablePathInStorage))
require.NoError(t, util.EnsureDirectory(tablePathInStorage))
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, compactedDBName), []byte(compactedDBName), 0755))
indexesSetup = []string{compactedDBName}

// verify that we are getting errIndexListCacheTooStale without refreshing the list cache
require.ErrorIs(t, errIndexListCacheTooStale, indexSet.sync(context.Background(), true, false))

// let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache
require.NoError(t, indexSet.Sync(context.Background()))

// verify that table has got only compacted db
checkIndexSet()
}
9 changes: 0 additions & 9 deletions pkg/storage/stores/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,6 @@ func (t *table) Sync(ctx context.Context) error {

for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
if errors.Is(err, errIndexListCacheTooStale) {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again")
t.storageClient.RefreshIndexListCache(ctx)

err = indexSet.Sync(ctx)
if err == nil {
continue
}
}
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
}
Expand Down