Skip to content

Commit

Permalink
sidecar: Handle intermediate restarts of sidecar gracefully.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 19, 2019
1 parent 166d54e commit e85ca0e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
}

// DownloadMeta downloads only meta file from bucket by block ID.
// TODO(bwplotka): Differentiate between network error & partial upload.
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func newLazyOverlapChecker(logger log.Logger, bucket objstore.Bucket, labels fun
}

func (c *lazyOverlapChecker) sync(ctx context.Context) error {
level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket")
if err := c.bucket.Iter(ctx, "", func(path string) error {
id, ok := block.IsBlockDir(path)
if !ok {
Expand Down Expand Up @@ -253,6 +252,7 @@ func (c *lazyOverlapChecker) sync(ctx context.Context) error {

func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.BlockMeta) error {
if !c.synced {
level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket for check", "id", newMeta.ULID.String())
if err := c.sync(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -280,8 +280,8 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
meta, err := ReadMetaFile(s.dir)
if err != nil {
// If we encounter any error, proceed with an empty meta file and overwrite it later.
// The meta file is only used to deduplicate uploads, which are properly handled
// by the system if their occur anyway.
// The meta file is only used to avoid unnecessary bucket.Exists call,
// which are properly handled by the system if their occur anyway.
if !os.IsNotExist(err) {
level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err)
}
Expand Down Expand Up @@ -316,6 +316,15 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return nil
}

// Check against bucket if the meta file for this block exists.
ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename))
if err != nil {
return errors.Wrap(err, "check exists")
}
if ok {
return nil
}

// We only ship of the first compacted block level as normal flow.
if m.Compaction.Level > 1 {
if !s.uploadCompacted {
Expand All @@ -329,15 +338,6 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
}
}

// Check against bucket if the meta file for this block exists.
ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename))
if err != nil {
return errors.Wrap(err, "check exists")
}
if ok {
return nil
}

if err := s.upload(ctx, m); err != nil {
level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err)
// No error returned, just log line. This is because we want other blocks to be uploaded even
Expand Down

0 comments on commit e85ca0e

Please sign in to comment.