Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate can specify blocks to copy #3388

Merged
merged 6 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.
- [#3388](https://github.com/thanos-io/thanos/pull/3378) Tools: Bucket replicator now can specify block IDs to copy.

### Fixed

Expand Down
11 changes: 11 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
Default("0000-01-01T00:00:00Z"))
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
ids := cmd.Flag("id", "Block to be replicated to the destination bucket. IDs will be used to match blocks and other matchers will be ignored. When specified, this command will be run only once after successful replication. Repeated field").Strings()

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
Expand All @@ -459,6 +460,15 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds()))
}

blockIDs := make([]ulid.ULID, 0, len(*ids))
for _, id := range *ids {
bid, err := ulid.Parse(id)
if err != nil {
return errors.Wrap(err, "invalid ULID found in --id flag")
}
blockIDs = append(blockIDs, bid)
}

return replicate.RunReplicate(
g,
logger,
Expand All @@ -474,6 +484,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
*singleRun,
minTime,
maxTime,
blockIDs,
)
})
}
Expand Down
5 changes: 5 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,11 @@ Flags:
duration relative to current time, such as -1d
or 2h45m. Valid duration units are ms, s, m, h,
d, w, y.
--id=ID ... Block to be replicated to the destination
bucket. IDs will be used to match blocks and
other matchers will be ignored. When specified,
this command will be run only once after
successful replication. Repeated field

```

Expand Down
8 changes: 5 additions & 3 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func RunReplicate(
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
blockIDs []ulid.ULID,
) error {
logger = log.With(logger, "component", "replicate")

Expand Down Expand Up @@ -182,6 +183,7 @@ func RunReplicate(
labelSelector,
resolutions,
compactions,
blockIDs,
).Filter
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -190,12 +192,12 @@ func RunReplicate(
timestamp := time.Now()
entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0)

ulid, err := ulid.New(ulid.Timestamp(timestamp), entropy)
runID, err := ulid.New(ulid.Timestamp(timestamp), entropy)
if err != nil {
return errors.Wrap(err, "generate replication run-id")
}

logger := log.With(logger, "replication-run-id", ulid.String())
logger := log.With(logger, "replication-run-id", runID.String())
level.Info(logger).Log("msg", "running replication attempt")

if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
Expand All @@ -209,7 +211,7 @@ func RunReplicate(
defer runutil.CloseWithLogOnErr(logger, fromBkt, "from bucket client")
defer runutil.CloseWithLogOnErr(logger, toBkt, "to bucket client")

if singleRun {
if singleRun || len(blockIDs) > 0 {
return replicateFn()
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type BlockFilter struct {
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs []ulid.ULID
}

// NewBlockFilter returns block filter.
Expand All @@ -40,6 +41,7 @@ func NewBlockFilter(
labelSelector labels.Selector,
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
blockIDs []ulid.ULID,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
Expand All @@ -49,11 +51,13 @@ func NewBlockFilter(
for _, compactionLevel := range compactionLevels {
allowedCompactions[compactionLevel] = struct{}{}
}

return &BlockFilter{
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockIDs,
}
}

Expand All @@ -64,6 +68,16 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
return false
}

// If required block IDs are set, we only match required blocks and ignore others.
if len(bf.blockIDs) > 0 {
for _, id := range bf.blockIDs {
if b.ULID == id {
return true
}
}
return false
}

blockLabels := labels.FromMap(b.Thanos.Labels)

labelMatch := bf.labelSelector.Matches(blockLabels)
Expand Down
33 changes: 32 additions & 1 deletion pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ func testMeta(ulid ulid.ULID) *metadata.Meta {
}

func TestReplicationSchemeAll(t *testing.T) {
testBlockID := testULID(0)
var cases = []struct {
name string
selector labels.Selector
blockIDs []ulid.ULID
prepare func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
assert func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
}{
Expand Down Expand Up @@ -291,6 +293,35 @@ func TestReplicationSchemeAll(t *testing.T) {
testutil.Equals(t, expected, got)
},
},
{
name: "BlockIDs",
blockIDs: []ulid.ULID{testBlockID},
prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
meta := testMeta(testBlockID)

b, err := json.Marshal(meta)
testutil.Ok(t, err)
_ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "meta.json"), bytes.NewReader(b))
_ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(testBlockID.String(), "index"), bytes.NewReader(nil))

ulid := testULID(1)
meta = testMeta(ulid)

b, err = json.Marshal(meta)
testutil.Ok(t, err)
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
expected := 3
got := len(targetBucket.Objects())
if got != expected {
t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected)
}
},
},
}

for _, c := range cases {
Expand All @@ -311,7 +342,7 @@ func TestReplicationSchemeAll(t *testing.T) {
selector = c.selector
}

filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}).Filter
filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs).Filter
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
testutil.Ok(t, err)

Expand Down