Skip to content

Commit

Permalink
receive: add glob support
Browse files Browse the repository at this point in the history
Add support for matching tenants with globs.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Feb 13, 2024
1 parent ec88d1b commit 835c35a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (e *Endpoint) UnmarshalJSON(data []byte) error {
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Glob bool `json:"glob,omitempty"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
Expand Down
28 changes: 22 additions & 6 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"fmt"
"math"
"path/filepath"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -249,7 +250,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
type multiHashring struct {
cache map[string]Hashring
hashrings []Hashring
tenantSets []map[string]struct{}
tenantSets []map[string]bool

// We need a mutex to guard concurrent access
// to the cache map, as this is both written to
Expand Down Expand Up @@ -280,8 +281,23 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// considered a default hashring and matches everything.
if t == nil {
found = true
} else if _, ok := t[tenant]; ok {
found = true
} else {
// Fast path for the common case of direct match.
if _, ok := t[tenant]; ok {
found = true
} else {
for tenantPattern, glob := range t {
if !glob {
continue
}
matches, err := filepath.Match(tenantPattern, tenant)
if err != nil {
return "", fmt.Errorf("error matching tenant pattern %s (tenant %s): %w", tenantPattern, tenant, err)
}
found = matches
}
}

}
if found {
m.mu.Lock()
Expand Down Expand Up @@ -320,12 +336,12 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
}
m.nodes = append(m.nodes, hashring.Nodes()...)
m.hashrings = append(m.hashrings, hashring)
var t map[string]struct{}
var t map[string]bool
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
t = make(map[string]bool)
}
for _, tenant := range h.Tenants {
t[tenant] = struct{}{}
t[tenant] = h.Glob
}
m.tenantSets = append(m.tenantSets, t)
}
Expand Down
44 changes: 44 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,3 +1014,47 @@ func TestReceiveExtractsTenant(t *testing.T) {

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-1")), e2emon.WaitMissingMetrics()))
}

func TestReceiveGlob(t *testing.T) {
e, err := e2e.NewDockerEnvironment("receive-glob")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

h := receive.HashringConfig{
Glob: true,
Tenants: []string{
"tenant-*",
},
Endpoints: []receive.Endpoint{
{Address: i.InternalEndpoint("grpc")},
},
}

r := e2ethanos.NewReceiveBuilder(e, "router").WithRouting(1, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(r))

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error {
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "thanos_tenant_id", Value: "tenant-1"},
{Name: "aa", Value: "bb"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: time.Now().UnixMilli()},
},
},
},
})
}))

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-1")), e2emon.WaitMissingMetrics()))

}

0 comments on commit 835c35a

Please sign in to comment.