Skip to content

Commit dd66b34

Browse files
authored
metricreader: fix data race in BackendReader (#619)
1 parent 0a52ff1 commit dd66b34

File tree

3 files changed

+206
-231
lines changed

3 files changed

+206
-231
lines changed

pkg/balance/metricsreader/backend_reader.go

Lines changed: 38 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"fmt"
1010
"math"
1111
"net"
12-
"reflect"
1312
"slices"
1413
"strconv"
1514
"strings"
@@ -293,13 +292,12 @@ func (br *BackendReader) readFromBackends(ctx context.Context, excludeZones []st
293292
return
294293
}
295294
br.metric2History(mf, addr)
296-
result := br.history2Value(addr)
297-
br.mergeQueryResult(result, addr)
298295
}, nil, br.lg)
299296
}(addr)
300297
}
301298
br.wgp.Wait()
302299

300+
br.history2QueryResult()
303301
if err := br.marshalHistory(addrs); err != nil {
304302
br.lg.Error("marshal backend history failed", zap.Any("addrs", addrs), zap.Error(err))
305303
}
@@ -369,87 +367,58 @@ func (br *BackendReader) metric2History(mfs map[string]*dto.MetricFamily, backen
369367
}
370368
}
371369

372-
// history2Value converts the history to results for all rules of one backend.
373-
// E.g. return the metrics for 1 minute as a matrix
374-
func (br *BackendReader) history2Value(backend string) map[string]model.Value {
375-
results := make(map[string]model.Value, len(br.queryRules))
376-
labels := map[model.LabelName]model.LabelValue{LabelNameInstance: model.LabelValue(backend)}
370+
// history2QueryResult generates new query results from the history.
371+
func (br *BackendReader) history2QueryResult() {
372+
now := monotime.Now()
377373
br.Lock()
378374
defer br.Unlock()
379375

376+
queryResults := make(map[string]QueryResult, len(br.queryRules))
380377
for ruleKey, rule := range br.queryRules {
381378
ruleHistory := br.history[ruleKey]
382379
if len(ruleHistory) == 0 {
383380
continue
384381
}
385-
beHistory := ruleHistory[backend]
386-
if len(beHistory.Step2History) == 0 {
387-
continue
388-
}
389382

383+
var value model.Value
390384
switch rule.ResultType {
391385
case model.ValVector:
392-
// vector indicates returning the latest pair
393-
lastPair := beHistory.Step2History[len(beHistory.Step2History)-1]
394-
results[ruleKey] = model.Vector{{Value: lastPair.Value, Timestamp: lastPair.Timestamp, Metric: labels}}
395-
case model.ValMatrix:
396-
// matrix indicates returning the history
397-
// copy a slice to avoid data race
398-
pairs := make([]model.SamplePair, len(beHistory.Step2History))
399-
copy(pairs, beHistory.Step2History)
400-
results[ruleKey] = model.Matrix{{Values: pairs, Metric: labels}}
401-
default:
402-
br.lg.Error("unsupported value type", zap.String("value type", rule.ResultType.String()))
403-
}
404-
}
405-
return results
406-
}
407-
408-
// mergeQueryResult merges the result of one backend into the final result.
409-
func (br *BackendReader) mergeQueryResult(backendValues map[string]model.Value, backend string) {
410-
now := monotime.Now()
411-
br.Lock()
412-
defer br.Unlock()
413-
for ruleKey, value := range backendValues {
414-
result := br.queryResults[ruleKey]
415-
result.UpdateTime = now
416-
if result.Value == nil || reflect.ValueOf(result.Value).IsNil() {
417-
result.Value = value
418-
br.queryResults[ruleKey] = result
419-
continue
420-
}
421-
switch result.Value.Type() {
422-
case model.ValVector:
423-
idx := -1
424-
for i, v := range result.Value.(model.Vector) {
425-
if v.Metric[LabelNameInstance] == model.LabelValue(backend) {
426-
idx = i
427-
break
386+
results := make([]*model.Sample, 0, len(ruleHistory))
387+
for backend, beHistory := range ruleHistory {
388+
if len(beHistory.Step2History) == 0 {
389+
continue
428390
}
391+
labels := map[model.LabelName]model.LabelValue{LabelNameInstance: model.LabelValue(backend)}
392+
// vector indicates returning the latest pair
393+
lastPair := beHistory.Step2History[len(beHistory.Step2History)-1]
394+
results = append(results, &model.Sample{Value: lastPair.Value, Timestamp: lastPair.Timestamp, Metric: labels})
429395
}
430-
if idx >= 0 {
431-
result.Value.(model.Vector)[idx] = value.(model.Vector)[0]
432-
} else {
433-
result.Value = append(result.Value.(model.Vector), value.(model.Vector)[0])
434-
}
396+
value = model.Vector(results)
435397
case model.ValMatrix:
436-
idx := -1
437-
for i, v := range result.Value.(model.Matrix) {
438-
if v.Metric[LabelNameInstance] == model.LabelValue(backend) {
439-
idx = i
440-
break
398+
results := make([]*model.SampleStream, 0, len(ruleHistory))
399+
for backend, beHistory := range ruleHistory {
400+
if len(beHistory.Step2History) == 0 {
401+
continue
441402
}
403+
labels := map[model.LabelName]model.LabelValue{LabelNameInstance: model.LabelValue(backend)}
404+
// matrix indicates returning the history
405+
// copy a slice to avoid data race
406+
pairs := make([]model.SamplePair, len(beHistory.Step2History))
407+
copy(pairs, beHistory.Step2History)
408+
results = append(results, &model.SampleStream{Values: pairs, Metric: labels})
442409
}
443-
if idx >= 0 {
444-
result.Value.(model.Matrix)[idx] = value.(model.Matrix)[0]
445-
} else {
446-
result.Value = append(result.Value.(model.Matrix), value.(model.Matrix)[0])
447-
}
410+
value = model.Matrix(results)
448411
default:
449-
br.lg.Error("unsupported value type", zap.Stringer("value type", result.Value.Type()))
412+
br.lg.Error("unsupported value type", zap.String("value type", rule.ResultType.String()))
413+
}
414+
415+
queryResults[ruleKey] = QueryResult{
416+
Value: value,
417+
UpdateTime: now,
450418
}
451-
br.queryResults[ruleKey] = result
452419
}
420+
421+
br.queryResults = queryResults
453422
}
454423

455424
// purgeHistory purges the expired or useless history values, otherwise the memory grows infinitely.
@@ -499,21 +468,12 @@ func (br *BackendReader) readFromOwner(ctx context.Context, ownerAddr string) er
499468
if err := json.Unmarshal(resp, &newHistory); err != nil {
500469
return err
501470
}
502-
backends := make(map[string]struct{})
503-
for _, ruleHistory := range newHistory {
504-
for backend := range ruleHistory {
505-
backends[backend] = struct{}{}
506-
}
507-
}
508471

509472
// If this instance becomes the owner in the next round, it can reuse the history.
510473
br.mergeHistory(newHistory)
511474

512-
// Update query result for the updated backends.
513-
for backend := range backends {
514-
result := br.history2Value(backend)
515-
br.mergeQueryResult(result, backend)
516-
}
475+
// Generate query result for all backends.
476+
br.history2QueryResult()
517477
return nil
518478
}
519479

@@ -544,6 +504,8 @@ func (br *BackendReader) mergeHistory(newHistory map[string]map[string]backendHi
544504
ruleHistory[backend] = backendHistory
545505
}
546506
}
507+
// avoid that the stale history is returned to other members when it just becomes the owner
508+
br.marshalledHistory = nil
547509
}
548510

549511
// marshalHistory marshals the backends that are read by this owner. The marshaled data will be returned to other members.

0 commit comments

Comments
 (0)