Skip to content

Commit

Permalink
wip: per tenant sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
mhoffm-aiven committed Apr 30, 2023
1 parent 539cb12 commit 59bc761
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 236 deletions.
12 changes: 7 additions & 5 deletions pkg/exemplars/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (

// MultiTSDB implements exemplarspb.ExemplarsServer that allows to fetch exemplars a MultiTSDB instance.
type MultiTSDB struct {
tsdbExemplarsServers func() map[string]*TSDB
tsdbExemplarsServers func() map[string][]*TSDB
}

// NewMultiTSDB creates new exemplars.MultiTSDB.
func NewMultiTSDB(tsdbExemplarsServers func() map[string]*TSDB) *MultiTSDB {
func NewMultiTSDB(tsdbExemplarsServers func() map[string][]*TSDB) *MultiTSDB {
return &MultiTSDB{
tsdbExemplarsServers: tsdbExemplarsServers,
}
Expand All @@ -32,9 +32,11 @@ func (m *MultiTSDB) Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Exe
}
matchers := parser.ExtractSelectors(expr)

for tenant, es := range m.tsdbExemplarsServers() {
if err := es.Exemplars(matchers, r.Start, r.End, s); err != nil {
return status.Error(codes.Aborted, errors.Wrapf(err, "get exemplars for tenant %s", tenant).Error())
for tenant, ess := range m.tsdbExemplarsServers() {
for shard, es := range ess {
if err := es.Exemplars(matchers, r.Start, r.End, s); err != nil {
return status.Error(codes.Aborted, errors.Wrapf(err, "get exemplars for tenant %s, shard %d", tenant, shard).Error())
}
}
}
return nil
Expand Down
44 changes: 14 additions & 30 deletions pkg/receive/handler_test.go.tmp → pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func newFakeTenantAppendable(f *fakeAppendable) *fakeTenantAppendable {
return &fakeTenantAppendable{f: f}
}

func (t *fakeTenantAppendable) TenantAppendable(_ string) (Appendable, error) {
return t.f, nil
func (t *fakeTenantAppendable) TenantAppendables(_ string) ([]Appendable, error) {
return []Appendable{t.f}, nil
}

type fakeAppendable struct {
Expand Down Expand Up @@ -850,9 +850,16 @@ type tsOverrideTenantStorage struct {
interval int64
}

func (s *tsOverrideTenantStorage) TenantAppendable(tenant string) (Appendable, error) {
a, err := s.TenantStorage.TenantAppendable(tenant)
return &tsOverrideAppendable{Appendable: a, interval: s.interval}, err
func (s *tsOverrideTenantStorage) TenantAppendables(tenant string) ([]Appendable, error) {
a, err := s.TenantStorage.TenantAppendables(tenant)
if err != nil {
return nil, err
}
res := make([]Appendable, len(a))
for i := range a {
res[i] = &tsOverrideAppendable{Appendable: a[i], interval: s.interval}
}
return res, nil
}

type tsOverrideAppendable struct {
Expand Down Expand Up @@ -1025,19 +1032,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: 1}

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}

testutil.Ok(b, initTenantTSDBs(m, handler.options.DefaultTenantID))
b.Run("OK", func(b testutil.TB) {
n := b.N()
b.ResetTimer()
Expand All @@ -1052,18 +1047,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: -1} // Timestamp can't go down, which will cause conflict error.

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}
testutil.Ok(b, initTenantTSDBs(m, handler.options.DefaultTenantID))

// First request should be fine, since we don't change timestamp, rest is wrong.
r := httptest.NewRecorder()
Expand Down
53 changes: 27 additions & 26 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func (l *localClient) SupportsWithoutReplicaLabels() bool {
return true
}

type tenant []*tenantShard
type tenant struct {
shards []*tenantShard
}

type tenantShard struct {
readyS *ReadyStorage
Expand All @@ -156,7 +158,7 @@ func newTenant(shards int) tenant {
}

}
return tenant(tenantShards)
return tenant{shards: tenantShards}
}

func (t *tenantShard) readyStorage() *ReadyStorage {
Expand Down Expand Up @@ -240,7 +242,7 @@ func (t *MultiTSDB) Flush() error {
merr := errutil.MultiError{}
wg := &sync.WaitGroup{}
for id, tenant := range t.tenants {
for sid, tenantShard := range tenant {
for sid, tenantShard := range tenant.shards {
db := tenantShard.readyStorage().Get()
if db == nil {
level.Error(t.logger).Log("msg", "flushing TSDB failed; not ready", "tenant", id, "shard", sid)
Expand Down Expand Up @@ -270,7 +272,7 @@ func (t *MultiTSDB) Close() error {

merr := errutil.MultiError{}
for id, tenant := range t.tenants {
for sid, tenantShard := range tenant {
for sid, tenantShard := range tenant.shards {
db := tenantShard.readyStorage().Get()
if db == nil {
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id, "shard", sid)
Expand Down Expand Up @@ -433,8 +435,8 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) {
)

for tenantID, tenant := range t.tenants {
for shardId, tenantShard := range tenant {
level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID, "shard", shardId)
for shardID, tenantShard := range tenant.shards {
level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID, "shard", shardID)
s := tenantShard.shipper()
if s == nil {
continue
Expand Down Expand Up @@ -488,7 +490,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
for _, tenantShard := range tenant {
for _, tenantShard := range tenant.shards {
client := tenantShard.client(t.logger)
if client != nil {
res = append(res, client)
Expand All @@ -499,22 +501,21 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {
return res
}

func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
func (t *MultiTSDB) TSDBExemplars() map[string][]*exemplars.TSDB {
t.mtx.RLock()
defer t.mtx.RUnlock()

// TODO: what to do here
return nil
/*
res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
e := tenant.exemplars()
if e != nil {
res[k] = e
res := make(map[string][]*exemplars.TSDB, 0)
for tenantID, tenant := range t.tenants {
perTenantExemplars := make([]*exemplars.TSDB, 0)
for _, shard := range tenant.shards {
if e := shard.exemplars(); e != nil {
perTenantExemplars = append(perTenantExemplars, e)
}
}
return res
*/
res[tenantID] = perTenantExemplars
}
return res
}

func (t *MultiTSDB) TenantStats(statsByLabelName string, tenantIDs ...string) []status.TenantStats {
Expand Down Expand Up @@ -569,14 +570,14 @@ func (t *MultiTSDB) TenantStats(statsByLabelName string, tenantIDs ...string) []
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant tenant) error {
for shardId, tenantShard := range tenant {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
reg = NewUnRegisterer(reg)
for shardID, tenantShard := range tenant.shards {
shardIDStr := strconv.FormatInt(int64(shardID), 10)

shardIdStr := strconv.FormatInt(int64(shardId), 10)
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID, "shard": shardIDStr}, t.reg)
reg = NewUnRegisterer(reg)

lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID, "shard_id", shardIdStr))
dataDir := filepath.Join(t.defaultTenantDataDir(tenantID), shardIdStr)
lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID, "shard_id", shardIDStr))
dataDir := filepath.Join(t.defaultTenantDataDir(tenantID), shardIDStr)

level.Info(logger).Log("msg", "opening TSDB")
opts := *t.tsdbOpts
Expand Down Expand Up @@ -657,8 +658,8 @@ func (t *MultiTSDB) TenantAppendables(tenantID string) ([]Appendable, error) {
if err != nil {
return nil, err
}
res := make([]Appendable, len(tenant))
for i, tenantShard := range tenant {
res := make([]Appendable, len(tenant.shards))
for i, tenantShard := range tenant.shards {
res[i] = tenantShard.readyStorage()
}
return res, nil
Expand Down
Loading

0 comments on commit 59bc761

Please sign in to comment.