From 1b03bc82dd121728c8361d3fda5b77ad460eb170 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Tue, 19 Mar 2019 11:25:33 +0000 Subject: [PATCH] Handle intermediate restarts of sidecar gracefully. Signed-off-by: Bartek Plotka --- pkg/block/block.go | 1 + pkg/shipper/shipper.go | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/block/block.go b/pkg/block/block.go index cdc52a3e081..008e1798d4b 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -118,6 +118,7 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { } // DownloadMeta downloads only meta file from bucket by block ID. +// TODO(bwplotka): Differentiate between network error & partial upload. func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename)) if err != nil { diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index a5208d84f13..c6f526f5933 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -223,7 +223,6 @@ func newLazyOverlapChecker(logger log.Logger, bucket objstore.Bucket, labels fun } func (c *lazyOverlapChecker) sync(ctx context.Context) error { - level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket") if err := c.bucket.Iter(ctx, "", func(path string) error { id, ok := block.IsBlockDir(path) if !ok { @@ -253,6 +252,7 @@ func (c *lazyOverlapChecker) sync(ctx context.Context) error { func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.BlockMeta) error { if !c.synced { + level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket for check", "id", newMeta.ULID.String()) if err := c.sync(ctx); err != nil { return err } @@ -280,8 +280,8 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { meta, err := ReadMetaFile(s.dir) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. - // The meta file is only used to deduplicate uploads, which are properly handled - // by the system if their occur anyway. + // The meta file is only used to avoid unnecessary bucket.Exists call, + // which are properly handled by the system if their occur anyway. if !os.IsNotExist(err) { level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err) } @@ -316,6 +316,15 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return nil } + // Check against bucket if the meta file for this block exists. + ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) + if err != nil { + return errors.Wrap(err, "check exists") + } + if ok { + return nil + } + // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { if !s.uploadCompacted { @@ -329,15 +338,6 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { } } - // Check against bucket if the meta file for this block exists. - ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) - if err != nil { - return errors.Wrap(err, "check exists") - } - if ok { - return nil - } - if err := s.upload(ctx, m); err != nil { level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) // No error returned, just log line. This is because we want other blocks to be uploaded even