Skip to content

Commit

Permalink
tools: Support bucket rewrite relabel (#3707)
Browse files Browse the repository at this point in the history
* support bucket rewrite relabel

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update tests comment

Signed-off-by: yeya24 <yb532204897@gmail.com>

* add changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored May 27, 2021
1 parent 20e8105 commit d8b21e7
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#4125](https://github.com/thanos-io/thanos/pull/4125) Rule: Add `--alert.relabel-config` / `--alert.relabel-config-file` allowing to specify alert relabel configurations like [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config)
- [#4211](https://github.com/thanos-io/thanos/pull/4211) Add TLS and basic authentication to Thanos APIs
- [#4249](https://github.com/thanos-io/thanos/pull/4249) UI: add dark theme
- [#3707](https://github.com/thanos-io/thanos/pull/3707) Tools: Added `--rewrite.to-relabel-config` to bucket rewrite tool to support series relabel from given blocks.

### Fixed
-
Expand Down
37 changes: 31 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

v1 "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -819,7 +821,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", false)
toRelabel := extflag.RegisterPathOrContent(cmd, "rewrite.to-relabel-config", "YAML file that contains relabel configs that will be applied to blocks", false)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -833,15 +836,36 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

deletionsYaml, err := toDelete.Content()
var modifiers []compactv2.Modifier

relabelYaml, err := toRelabel.Content()
if err != nil {
return err
}
var relabels []*relabel.Config
if len(relabelYaml) > 0 {
relabels, err = block.ParseRelabelConfig(relabelYaml, nil)
if err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithRelabelModifier(relabels...))
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}
var deletions []metadata.DeletionRequest
if len(deletionsYaml) > 0 {
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithDeletionModifier(deletions...))
}

if len(modifiers) == 0 {
return errors.New("rewrite configuration should be provided")
}

var ids []ulid.ULID
for _, id := range *blockIDs {
Expand Down Expand Up @@ -885,6 +909,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{
Sources: meta.Compaction.Sources,
DeletionsApplied: deletions,
RelabelsApplied: relabels,
})
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource
Expand Down Expand Up @@ -916,8 +941,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml), "toRelabel", string(relabelYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, modifiers...); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

Expand Down
8 changes: 8 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,14 @@ Flags:
Path to YAML file that contains
[]metadata.DeletionRequest that will be applied
to blocks
--rewrite.to-relabel-config=<content>
Alternative to 'rewrite.to-relabel-config-file'
flag (mutually exclusive). Content of YAML file
that contains relabel configs that will be
applied to blocks
--rewrite.to-relabel-config-file=<file-path>
Path to YAML file that contains relabel configs
that will be applied to blocks
--tmp.dir="/tmp/thanos-rewrite"
Working directory for temporary files
--tracing.config=<content>
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,15 +879,18 @@ var (
)

// ParseRelabelConfig parses relabel configuration.
// If supportedActions not specified, all relabel actions are valid.
func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}

for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
if supportedActions != nil {
for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/thanos-io/thanos/pkg/runutil"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/runutil"
)

type SourceType string
Expand Down Expand Up @@ -95,6 +97,8 @@ type Rewrite struct {
Sources []ulid.ULID `json:"sources,omitempty"`
// Deletions if applied (in order).
DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"`
// Relabels if applied.
RelabelsApplied []*relabel.Config `json:"relabels_applied,omitempty"`
}

type Matchers []*labels.Matcher
Expand Down
1 change: 1 addition & 0 deletions pkg/compactv2/chunk_series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/thanos/pkg/block"
)

Expand Down
189 changes: 189 additions & 0 deletions pkg/compactv2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -397,6 +400,192 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
NumChunks: 12,
},
},
{
name: "1 block + relabel modifier, two chunks from the same series are merged into one larger chunk",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}},
},
},
// Not used in this test case.
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("no-match"),
SourceLabels: model.LabelNames{"a"},
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
},
expectedStats: tsdb.BlockStats{
NumSamples: 6,
NumSeries: 1,
NumChunks: 1,
},
},
{
name: "1 block + relabel modifier, delete first series",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"},
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 13,
NumSeries: 2,
NumChunks: 2,
},
},
{
name: "1 block + relabel modifier, series reordered",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// {a="1"} will be relabeled to {a="3"} while {a="2"} will be relabeled to {a="0"}.
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "3",
},
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("2"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "0",
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "0"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}},
},
expectedChanges: "Relabelled {a=\"1\"} {a=\"3\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 13,
NumSeries: 2,
NumChunks: 2,
},
},
{
name: "1 block + relabel modifier, series deleted because of no labels left after relabel",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.LabelDrop,
Regex: relabel.MustNewRegexp("a"),
},
)},
expected: nil,
expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nDeleted {a=\"2\"} [{0 25}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 0,
NumSeries: 0,
NumChunks: 0,
},
},
{
name: "1 block + relabel modifier, series 1 is deleted because of no labels left after relabel",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
{
{lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.LabelDrop,
Regex: relabel.MustNewRegexp("a"),
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nRelabelled {a=\"2\", b=\"1\"} {b=\"1\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 7,
NumSeries: 1,
NumChunks: 1,
},
},
{
name: "1 block + relabel modifier, series merged after relabeling",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {10, 10}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {2, 2}, {3, 3}}, {{4, 4}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Replace values of label name "a" with "0".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("1|2"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "0",
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "0"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
expectedChanges: "Relabelled {a=\"1\"} {a=\"0\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 9,
NumSeries: 1,
NumChunks: 1,
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-series-writer")
Expand Down
Loading

0 comments on commit d8b21e7

Please sign in to comment.