Skip to content

Commit 98dbdad

Browse files
committed
Finish the basic distributor tests
1 parent 3ba05bd commit 98dbdad

File tree

2 files changed

+142
-20
lines changed

2 files changed

+142
-20
lines changed

distributor/distributor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,10 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
284284
samples := util.FromWriteRequest(req)
285285
d.receivedSamples.Add(float64(len(samples)))
286286

287+
if len(samples) == 0 {
288+
return &cortex.WriteResponse{}, nil
289+
}
290+
287291
limiter := d.getOrCreateIngestLimiter(userID)
288292
if !limiter.AllowN(time.Now(), len(samples)) {
289293
return nil, errIngestionRateLimitExceeded

distributor/distributor_test.go

Lines changed: 138 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,169 @@
11
package distributor
22

33
import (
4+
"fmt"
45
"testing"
56
"time"
67

78
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/prometheus/storage/remote"
10+
"github.com/stretchr/testify/assert"
11+
"golang.org/x/net/context"
12+
"google.golang.org/grpc"
813

14+
"github.com/weaveworks/common/user"
15+
"github.com/weaveworks/cortex"
916
"github.com/weaveworks/cortex/ring"
1017
)
1118

19+
// mockRing doesn't do any consistent hashing, just returns same ingesters for every query.
1220
type mockRing struct {
1321
prometheus.Counter
22+
ingesters []*ring.IngesterDesc
1423
}
1524

1625
func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) {
17-
return nil, nil
26+
return r.ingesters[:n], nil
1827
}
1928

2029
func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) {
21-
return nil, nil
30+
result := [][]*ring.IngesterDesc{}
31+
for i := 0; i < len(keys); i++ {
32+
result = append(result, r.ingesters[:n])
33+
}
34+
return result, nil
2235
}
2336

2437
func (r mockRing) GetAll() []*ring.IngesterDesc {
25-
return nil
38+
return r.ingesters
2639
}
2740

2841
type mockIngester struct {
42+
happy bool
2943
}
3044

31-
func TestDistributor(t *testing.T) {
32-
ring := mockRing{
33-
Counter: prometheus.NewCounter(prometheus.CounterOpts{
34-
Name: "foo",
35-
}),
45+
func (i mockIngester) Push(ctx context.Context, in *remote.WriteRequest, opts ...grpc.CallOption) (*cortex.WriteResponse, error) {
46+
if !i.happy {
47+
return nil, fmt.Errorf("Fail")
3648
}
49+
return &cortex.WriteResponse{}, nil
50+
}
3751

38-
_, err := New(Config{
39-
ReplicationFactor: 3,
40-
MinReadSuccesses: 2,
41-
HeartbeatTimeout: 1 * time.Minute,
42-
RemoteTimeout: 1 * time.Minute,
43-
ClientCleanupPeriod: 1 * time.Minute,
44-
IngestionRateLimit: 10000,
45-
IngestionBurstSize: 10000,
46-
}, ring)
47-
if err != nil {
48-
t.Fatal(err)
49-
}
52+
func (i mockIngester) Query(ctx context.Context, in *cortex.QueryRequest, opts ...grpc.CallOption) (*cortex.QueryResponse, error) {
53+
return nil, nil
54+
}
55+
56+
func (i mockIngester) LabelValues(ctx context.Context, in *cortex.LabelValuesRequest, opts ...grpc.CallOption) (*cortex.LabelValuesResponse, error) {
57+
return nil, nil
58+
}
59+
60+
func (i mockIngester) UserStats(ctx context.Context, in *cortex.UserStatsRequest, opts ...grpc.CallOption) (*cortex.UserStatsResponse, error) {
61+
return nil, nil
62+
}
5063

64+
func (i mockIngester) MetricsForLabelMatchers(ctx context.Context, in *cortex.MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (*cortex.MetricsForLabelMatchersResponse, error) {
65+
return nil, nil
66+
}
67+
68+
func TestDistributor(t *testing.T) {
69+
ctx := user.WithID(context.Background(), "user")
70+
for i, tc := range []struct {
71+
ingesters []mockIngester
72+
samples int
73+
expectedResponse *cortex.WriteResponse
74+
expectedError error
75+
}{
76+
// A push of no samples shouldn't block or return error, even if ingesters are sad
77+
{
78+
ingesters: []mockIngester{{}, {}, {}},
79+
expectedResponse: &cortex.WriteResponse{},
80+
},
81+
82+
// A push to 3 happy ingesters should succeed
83+
{
84+
samples: 10,
85+
ingesters: []mockIngester{{true}, {true}, {true}},
86+
expectedResponse: &cortex.WriteResponse{},
87+
},
88+
89+
// A push to 2 happy ingesters should succeed
90+
{
91+
samples: 10,
92+
ingesters: []mockIngester{{}, {true}, {true}},
93+
expectedResponse: &cortex.WriteResponse{},
94+
},
95+
96+
// A push to 1 happy ingesters should fail
97+
{
98+
samples: 10,
99+
ingesters: []mockIngester{{}, {}, {true}},
100+
expectedError: fmt.Errorf("Fail"),
101+
},
102+
103+
// A push to 0 happy ingesters should fail
104+
{
105+
samples: 10,
106+
ingesters: []mockIngester{{}, {}, {}},
107+
expectedError: fmt.Errorf("Fail"),
108+
},
109+
} {
110+
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
111+
ingesterDescs := []*ring.IngesterDesc{}
112+
ingesters := map[string]mockIngester{}
113+
for i, ingester := range tc.ingesters {
114+
addr := fmt.Sprintf("%d", i)
115+
ingesterDescs = append(ingesterDescs, &ring.IngesterDesc{
116+
Addr: addr,
117+
Timestamp: time.Now().Unix(),
118+
})
119+
ingesters[addr] = ingester
120+
}
121+
122+
ring := mockRing{
123+
Counter: prometheus.NewCounter(prometheus.CounterOpts{
124+
Name: "foo",
125+
}),
126+
ingesters: ingesterDescs,
127+
}
128+
129+
d, err := New(Config{
130+
ReplicationFactor: 3,
131+
MinReadSuccesses: 2,
132+
HeartbeatTimeout: 1 * time.Minute,
133+
RemoteTimeout: 1 * time.Minute,
134+
ClientCleanupPeriod: 1 * time.Minute,
135+
IngestionRateLimit: 10000,
136+
IngestionBurstSize: 10000,
137+
138+
ingesterClientFactory: func(addr string) cortex.IngesterClient {
139+
return ingesters[addr]
140+
},
141+
}, ring)
142+
if err != nil {
143+
t.Fatal(err)
144+
}
145+
defer d.Stop()
146+
147+
request := &remote.WriteRequest{}
148+
for i := 0; i < tc.samples; i++ {
149+
ts := &remote.TimeSeries{
150+
Labels: []*remote.LabelPair{
151+
{"__name__", "foo"},
152+
{"bar", "baz"},
153+
{"sample", fmt.Sprintf("%d", i)},
154+
},
155+
}
156+
ts.Samples = []*remote.Sample{
157+
{
158+
Value: float64(i),
159+
TimestampMs: int64(i),
160+
},
161+
}
162+
request.Timeseries = append(request.Timeseries, ts)
163+
}
164+
response, err := d.Push(ctx, request)
165+
assert.Equal(t, tc.expectedResponse, response, "Wrong response")
166+
assert.Equal(t, tc.expectedError, err, "Wrong error")
167+
})
168+
}
51169
}

0 commit comments

Comments
 (0)