Skip to content

Commit 3ba05bd

Browse files
committed
Review feedback and start of a test
1 parent a58ef5f commit 3ba05bd

File tree

2 files changed

+78
-22
lines changed

2 files changed

+78
-22
lines changed

distributor/distributor.go

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type Config struct {
8787
ClientCleanupPeriod time.Duration
8888
IngestionRateLimit float64
8989
IngestionBurstSize int
90+
91+
// for testing
92+
ingesterClientFactory func(string) cortex.IngesterClient
9093
}
9194

9295
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -218,22 +221,27 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester
218221
return client, nil
219222
}
220223

221-
conn, err := grpc.Dial(
222-
ingester.Addr,
223-
grpc.WithTimeout(d.cfg.RemoteTimeout),
224-
grpc.WithInsecure(),
225-
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
226-
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
227-
middleware.ClientUserHeaderInterceptor,
228-
)),
229-
)
230-
if err != nil {
231-
return nil, err
232-
}
233-
234-
client = ingesterClient{
235-
IngesterClient: cortex.NewIngesterClient(conn),
236-
conn: conn,
224+
if d.cfg.ingesterClientFactory != nil {
225+
client = ingesterClient{
226+
IngesterClient: d.cfg.ingesterClientFactory(ingester.Addr),
227+
}
228+
} else {
229+
conn, err := grpc.Dial(
230+
ingester.Addr,
231+
grpc.WithTimeout(d.cfg.RemoteTimeout),
232+
grpc.WithInsecure(),
233+
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
234+
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
235+
middleware.ClientUserHeaderInterceptor,
236+
)),
237+
)
238+
if err != nil {
239+
return nil, err
240+
}
241+
client = ingesterClient{
242+
IngesterClient: cortex.NewIngesterClient(conn),
243+
conn: conn,
244+
}
237245
}
238246
d.clients[ingester.Addr] = client
239247
return client, nil
@@ -301,8 +309,6 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
301309
sample: samples[i],
302310
minSuccess: minSuccess,
303311
maxFailures: len(ingesters[i]) - minSuccess,
304-
succeeded: 0,
305-
failed: 0,
306312
}
307313

308314
// Skip those that have not heartbeated in a while. NB these are still
@@ -330,7 +336,6 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
330336

331337
pushTracker := pushTracker{
332338
samplesPending: int32(len(samples)),
333-
samplesFailed: 0,
334339
done: make(chan struct{}),
335340
err: make(chan error),
336341
}
@@ -363,8 +368,8 @@ func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
363368
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker, pushTracker *pushTracker) {
364369
err := d.sendSamplesErr(ctx, ingester, sampleTrackers)
365370

366-
// If we suceed, decrement each sample's pending count by one. If we reach
367-
// the requred number of successful puts on this sample, then decrement the
371+
// If we succeed, decrement each sample's pending count by one. If we reach
372+
// the required number of successful puts on this sample, then decrement the
368373
// number of pending samples by one. If we successfully push all samples to
369374
// min success ingesters, wake up the waiting rpc so it can return early.
370375
// Similarly, track the number of errors, and if it exceeds maxFailures
@@ -374,7 +379,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
374379
// goroutine will write to either channel.
375380
for i := range sampleTrackers {
376381
if err != nil {
377-
if atomic.AddInt32(&sampleTrackers[i].failed, 1) > int32(sampleTrackers[i].maxFailures) {
382+
if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) {
378383
continue
379384
}
380385
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {

distributor/distributor_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package distributor
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
9+
"github.com/weaveworks/cortex/ring"
10+
)
11+
12+
type mockRing struct {
13+
prometheus.Counter
14+
}
15+
16+
func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) {
17+
return nil, nil
18+
}
19+
20+
func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) {
21+
return nil, nil
22+
}
23+
24+
func (r mockRing) GetAll() []*ring.IngesterDesc {
25+
return nil
26+
}
27+
28+
type mockIngester struct {
29+
}
30+
31+
func TestDistributor(t *testing.T) {
32+
ring := mockRing{
33+
Counter: prometheus.NewCounter(prometheus.CounterOpts{
34+
Name: "foo",
35+
}),
36+
}
37+
38+
_, err := New(Config{
39+
ReplicationFactor: 3,
40+
MinReadSuccesses: 2,
41+
HeartbeatTimeout: 1 * time.Minute,
42+
RemoteTimeout: 1 * time.Minute,
43+
ClientCleanupPeriod: 1 * time.Minute,
44+
IngestionRateLimit: 10000,
45+
IngestionBurstSize: 10000,
46+
}, ring)
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
51+
}

0 commit comments

Comments
 (0)