Skip to content

Commit 42405da

Browse files
committed
Fix GetUsersCloseToLimit too
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
1 parent cf67760 commit 42405da

File tree

2 files changed

+69
-45
lines changed

2 files changed

+69
-45
lines changed

pkg/usagetracker/tracker.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -665,15 +665,12 @@ func (t *UsageTracker) stop(_ error) error {
665665

666666
// TrackSeries implements usagetrackerpb.UsageTrackerServer.
667667
func (t *UsageTracker) TrackSeries(_ context.Context, req *usagetrackerpb.TrackSeriesRequest) (*usagetrackerpb.TrackSeriesResponse, error) {
668-
t.partitionsMtx.RLock()
669-
p, ok := t.partitions[req.Partition]
670-
t.partitionsMtx.RUnlock()
671-
if !ok {
672-
return nil, fmt.Errorf("partition handler %d not found", req.Partition)
673-
}
674-
if p.State() != services.Running {
675-
return nil, fmt.Errorf("partition handler %d is not running (state: %s)", req.Partition, p.State())
668+
partition := req.Partition
669+
p, err := t.runningPartition(partition)
670+
if err != nil {
671+
return nil, err
676672
}
673+
677674
rejected, err := p.store.trackSeries(context.Background(), req.UserID, req.SeriesHashes, time.Now())
678675
if err != nil {
679676
return nil, err
@@ -684,12 +681,9 @@ func (t *UsageTracker) TrackSeries(_ context.Context, req *usagetrackerpb.TrackS
684681
// GetUsersCloseToLimit implements usagetrackerpb.UsageTrackerServer.
685682
func (t *UsageTracker) GetUsersCloseToLimit(_ context.Context, req *usagetrackerpb.GetUsersCloseToLimitRequest) (*usagetrackerpb.GetUsersCloseToLimitResponse, error) {
686683
partition := req.Partition
687-
688-
t.partitionsMtx.RLock()
689-
p, ok := t.partitions[partition]
690-
t.partitionsMtx.RUnlock()
691-
if !ok {
692-
return nil, fmt.Errorf("partition handler %d not found", partition)
684+
p, err := t.runningPartition(partition)
685+
if err != nil {
686+
return nil, err
693687
}
694688

695689
userIDs := p.store.getSortedUsersCloseToLimit()
@@ -699,6 +693,19 @@ func (t *UsageTracker) GetUsersCloseToLimit(_ context.Context, req *usagetracker
699693
}, nil
700694
}
701695

696+
func (t *UsageTracker) runningPartition(partition int32) (*partitionHandler, error) {
697+
t.partitionsMtx.RLock()
698+
p, ok := t.partitions[partition]
699+
t.partitionsMtx.RUnlock()
700+
if !ok {
701+
return nil, fmt.Errorf("partition handler %d not found", partition)
702+
}
703+
if p.State() != services.Running {
704+
return nil, fmt.Errorf("partition handler %d is not running (state: %s)", partition, p.State())
705+
}
706+
return p, nil
707+
}
708+
702709
// CheckReady performs a readiness check.
703710
// An instance is ready when it has instantiated all the partitions that should belong to it according to the ring.
704711
func (t *UsageTracker) CheckReady(_ context.Context) error {

pkg/usagetracker/tracker_test.go

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -552,44 +552,61 @@ func TestUsageTracker_PartitionAssignment(t *testing.T) {
552552
}
553553

554554
func TestUsageTracker_GetUsersCloseToLimit(t *testing.T) {
555-
makeSeries := func(n int) []uint64 {
556-
series := make([]uint64, n)
557-
for i := range series {
558-
series[i] = uint64(i)
555+
t.Run("happy case", func(t *testing.T) {
556+
makeSeries := func(n int) []uint64 {
557+
series := make([]uint64, n)
558+
for i := range series {
559+
series[i] = uint64(i)
560+
}
561+
return series
559562
}
560-
return series
561-
}
562-
563-
tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
564-
"a": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
565-
"b": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
566-
"c": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
567-
"d": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
568-
"e": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
569-
})
570563

571-
for _, tenant := range []string{"a", "b", "c", "d", "e"} {
572-
resp, err := tracker.TrackSeries(t.Context(), &usagetrackerpb.TrackSeriesRequest{
573-
UserID: tenant,
574-
Partition: 0,
575-
SeriesHashes: makeSeries(900),
564+
tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
565+
"a": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
566+
"b": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
567+
"c": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
568+
"d": {MaxActiveSeriesPerUser: 2000 * testPartitionsCount},
569+
"e": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
576570
})
577-
require.NoError(t, err)
578-
require.Empty(t, resp.RejectedSeriesHashes)
579-
}
580571

581-
// Call updateLimits (on all partitions, although we only need partition 0.
582-
withRLock(&tracker.partitionsMtx, func() {
583-
for _, p := range tracker.partitions {
584-
done := make(chan struct{})
585-
p.forceUpdateLimitsForTests <- done
586-
<-done
572+
for _, tenant := range []string{"a", "b", "c", "d", "e"} {
573+
resp, err := tracker.TrackSeries(t.Context(), &usagetrackerpb.TrackSeriesRequest{
574+
UserID: tenant,
575+
Partition: 0,
576+
SeriesHashes: makeSeries(900),
577+
})
578+
require.NoError(t, err)
579+
require.Empty(t, resp.RejectedSeriesHashes)
587580
}
581+
582+
// Call updateLimits (on all partitions, although we only need partition 0.
583+
withRLock(&tracker.partitionsMtx, func() {
584+
for _, p := range tracker.partitions {
585+
done := make(chan struct{})
586+
p.forceUpdateLimitsForTests <- done
587+
<-done
588+
}
589+
})
590+
591+
resp, err := tracker.GetUsersCloseToLimit(t.Context(), &usagetrackerpb.GetUsersCloseToLimitRequest{Partition: 0})
592+
require.NoError(t, err)
593+
require.Equal(t, []string{"a", "c", "e"}, resp.SortedUserIds, "List of users close to the limit should be sorted lexicographically")
588594
})
589595

590-
resp, err := tracker.GetUsersCloseToLimit(t.Context(), &usagetrackerpb.GetUsersCloseToLimitRequest{Partition: 0})
591-
require.NoError(t, err)
592-
require.Equal(t, []string{"a", "c", "e"}, resp.SortedUserIds, "List of users close to the limit should be sorted lexicographically")
596+
t.Run("partition handler is not running", func(t *testing.T) {
597+
tracker := newReadyTestUsageTracker(t, map[string]*validation.Limits{
598+
"a": {MaxActiveSeriesPerUser: 1000 * testPartitionsCount},
599+
})
600+
601+
// Call updateLimits (on all partitions, although we only need partition 0.
602+
withRLock(&tracker.partitionsMtx, func() {
603+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), tracker.partitions[0]))
604+
})
605+
606+
_, err := tracker.GetUsersCloseToLimit(t.Context(), &usagetrackerpb.GetUsersCloseToLimitRequest{Partition: 0})
607+
require.Error(t, err)
608+
require.ErrorContains(t, err, "partition handler 0 is not running (state: Terminated)")
609+
})
593610
}
594611

595612
func callPrepareDownscaleEndpoint(t *testing.T, ut *UsageTracker, method string) {

0 commit comments

Comments
 (0)