Skip to content

Commit

Permalink
Tools bucket replicate: concurrent replication
Browse files Browse the repository at this point in the history
Converted blocks into groups and then replicated those blocks in groups using go-routines.

Signed-off-by: Kartik-Garg <kartik.garg@infracloud.io>
  • Loading branch information
Kartik-Garg committed Mar 1, 2023
1 parent 62423a1 commit b76c58b
Show file tree
Hide file tree
Showing 6 changed files with 473 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5548](https://github.com/thanos-io/thanos/pull/5548) Query: Added experimental support for load balancing across multiple Store endpoints.
- [#6148](https://github.com/thanos-io/thanos/pull/6148) Query-frontend: add traceID to slow query detected log line
- [#6153](https://github.com/thanos-io/thanos/pull/6153) Query-frontend: add remote_user (from http basic auth) and remote_addr to slow query detected log line
- [#6174](https://github.com/thanos-io/thanos/pull/6174) Tools bucket replicate: Blocks are replicated concurrently.

### Fixed

Expand Down
12 changes: 8 additions & 4 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ type bucketWebConfig struct {
}

type bucketReplicateConfig struct {
resolutions []time.Duration
compactions []int
matcherStrs string
singleRun bool
resolutions []time.Duration
compactions []int
matcherStrs string
singleRun bool
concurrencyLevel int
}

type bucketDownsampleConfig struct {
Expand Down Expand Up @@ -211,6 +212,8 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla

cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun)

cmd.Flag("concurrency-level", "Number of go-routines to use for replication.").Default("1").IntVar(&tbc.concurrencyLevel)

return tbc
}

Expand Down Expand Up @@ -735,6 +738,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
objStoreConfig,
toObjStoreConfig,
tbc.singleRun,
tbc.concurrencyLevel,
minTime,
maxTime,
blockIDs,
Expand Down
1 change: 1 addition & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ with Thanos blocks (meta.json has to have Thanos metadata).
Flags:
--compaction=1... ... Only blocks with these compaction levels will be
replicated. Repeated flag.
--concurrency-level=1 Number of go-routines to use for replication.
-h, --help Show context-sensitive help (also try --help-long
and --help-man).
--http-address="0.0.0.0:10902"
Expand Down
3 changes: 2 additions & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func RunReplicate(
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
concurrencyLevel int,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
blockIDs []ulid.ULID,
ignoreMarkedForDeletion bool,
Expand Down Expand Up @@ -183,7 +184,7 @@ func RunReplicate(
logger := log.With(logger, "replication-run-id", runID.String())
level.Info(logger).Log("msg", "running replication attempt")

if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx, concurrencyLevel, reg); err != nil {
return errors.Wrap(err, "replication execute")
}

Expand Down
151 changes: 134 additions & 17 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"fmt"
"io"
"path"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -169,34 +170,84 @@ func newReplicationScheme(
}
}

func (rs *replicationScheme) execute(ctx context.Context) error {
availableBlocks := []*metadata.Meta{}

func (rs *replicationScheme) execute(ctx context.Context, concurrencyLevel int, reg *prometheus.Registry) error {
metas, partials, err := rs.fetcher.Fetch(ctx)
if err != nil {
return err
}

for id := range partials {
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
}

filteredMetas := map[ulid.ULID]*metadata.Meta{}
for id, meta := range metas {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
filteredMetas[id] = meta
}
}

// In order to prevent races in compactions by the target environment, we
// need to replicate oldest start timestamp first.
sort.Slice(availableBlocks, func(i, j int) bool {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})
for id := range partials {
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
}

const (
deleteDelay = time.Duration(48 * time.Hour)
)
var (
compactMetrics = newCompactMetrics(reg, deleteDelay)
acceptMalformedIndex = false
enableVerticalCompaction = false
blockFilesConcurrency = 1
compactBlocksFetchConcurrency = 1
)

grouper := compact.NewDefaultGrouper(
rs.logger,
rs.toBkt,
acceptMalformedIndex,
enableVerticalCompaction,
reg,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(""),
blockFilesConcurrency,
compactBlocksFetchConcurrency,
)

groups, err := grouper.Groups(filteredMetas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for Thanos blocks")
}

for _, group := range groups {

var wg sync.WaitGroup
errs := make(chan error, len(group.IDs()))

blockChan := make(chan ulid.ULID, len(group.IDs()))
for _, blockId := range group.IDs() {
blockChan <- blockId
}
close(blockChan)

wg.Add(concurrencyLevel)

for i := 0; i < concurrencyLevel; i++ {
go func(errs chan<- error) {
defer wg.Done()
for block := range blockChan {
if err := rs.ensureBlockIsReplicated(ctx, block); err != nil {
errs <- err
}
}
}(errs)
}

for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
wg.Wait()
close(errs)

for {
err := <-errs
return err
}
}

Expand Down Expand Up @@ -313,3 +364,69 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN

return nil
}

type compactMetrics struct {
halted prometheus.Gauge
retried prometheus.Counter
iterations prometheus.Counter
cleanups prometheus.Counter
partialUploadDeleteAttempts prometheus.Counter
blocksCleaned prometheus.Counter
blockCleanupFailures prometheus.Counter
blocksMarked *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
}

func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *compactMetrics {
_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_delete_delay_seconds",
Help: "Configured delete delay in seconds.",
}, func() float64 {
return deleteDelay.Seconds()
})

m := &compactMetrics{}

m.halted = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_halted",
Help: "Set to 1 if the compactor halted due to an unexpected error.",
})
m.halted.Set(0)
m.retried = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_retries_total",
Help: "Total number of retries after retriable compactor error.",
})
m.iterations = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
m.cleanups = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_block_cleanup_loops_total",
Help: "Total number of concurrent cleanup loops of partially uploaded blocks and marked blocks that were executed successfully.",
})
m.partialUploadDeleteAttempts = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
m.blocksCleaned = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_blocks_cleaned_total",
Help: "Total number of blocks deleted in compactor.",
})
m.blockCleanupFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_block_cleanup_failures_total",
Help: "Failures encountered while deleting blocks in compactor.",
})
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker", "reason"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")

m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_garbage_collected_blocks_total",
Help: "Total number of blocks marked for deletion by compactor.",
})
return m
}
Loading

0 comments on commit b76c58b

Please sign in to comment.