This repository was archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 106
Add tool for reporting out of order and duplicate metrics #2014
Open
pub-djedruszczak
wants to merge
12
commits into
grafana:master
Choose a base branch
from
bloomberg:feature-report-out-of-order
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
69d61be
Add tool for reporting out of order/duplicate metrics grouped by name…
pub-djedruszczak e5f9b62
Update tool documentation
pub-djedruszczak 3243f89
Restructure files to match contributing guidelines
pub-djedruszczak 0180444
Add mt-kafka-mdm-report-out-of-order to .gitignore
pub-djedruszczak 355c28c
Fix .gitignore order
pub-djedruszczak bce2ae0
Fix go vet
pub-djedruszczak c16f94f
Rebase master
pub-djedruszczak 32322d7
Manually reorder docs
pub-djedruszczak 6a4545e
Fix segmentation fault
pub-djedruszczak 7a32b2a
Update cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go
pub-djedruszczak ea8a434
Gofmt
pub-djedruszczak 9b82dce
Update docs/tools.md
pub-djedruszczak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package main | ||
|
||
import ( | ||
"strings" | ||
|
||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
type Aggregate struct { | ||
Count int | ||
OutOfOrderCount int | ||
DuplicateCount int | ||
} | ||
|
||
func aggregateByName(tracker Tracker) map[string]Aggregate { | ||
aggregates := map[string]Aggregate{} | ||
|
||
for _, track := range tracker { | ||
aggregate, _ := aggregates[track.Name] | ||
aggregate.Count += track.Count | ||
aggregate.OutOfOrderCount += track.OutOfOrderCount | ||
aggregate.DuplicateCount += track.DuplicateCount | ||
aggregates[track.Name] = aggregate | ||
} | ||
|
||
return aggregates | ||
} | ||
|
||
func aggregateByTag(tracker Tracker, groupByTag string) map[string]Aggregate { | ||
aggregates := map[string]Aggregate{} | ||
|
||
for _, track := range tracker { | ||
for _, tag := range track.Tags { | ||
kv := strings.Split(tag, "=") | ||
if len(kv) != 2 { | ||
log.Errorf("unexpected tag encoding for metric with name=%q tag=%q", track.Name, tag) | ||
continue | ||
} | ||
if kv[0] == groupByTag { | ||
aggregate, _ := aggregates[kv[1]] | ||
aggregate.Count += track.Count | ||
aggregate.OutOfOrderCount += track.OutOfOrderCount | ||
aggregate.DuplicateCount += track.DuplicateCount | ||
aggregates[kv[1]] = aggregate | ||
} | ||
} | ||
} | ||
|
||
return aggregates | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
package main | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
"os" | ||
"time" | ||
|
||
"github.com/grafana/globalconf" | ||
"github.com/grafana/metrictank/idx/cassandra" | ||
inKafkaMdm "github.com/grafana/metrictank/input/kafkamdm" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
func ParseFlags() Flags { | ||
flags := NewFlags() | ||
|
||
flag.Usage = flags.Usage | ||
|
||
flags.Parse(os.Args[1:]) | ||
|
||
return *flags | ||
} | ||
|
||
type Flags struct { | ||
flagSet *flag.FlagSet | ||
|
||
RunDuration time.Duration | ||
Config string | ||
PartitionFrom int | ||
PartitionTo int | ||
ReorderWindow uint | ||
Prefix string | ||
Substr string | ||
GroupByName bool | ||
GroupByTag string | ||
} | ||
|
||
func NewFlags() *Flags { | ||
var flags Flags | ||
|
||
flags.flagSet = flag.NewFlagSet("application flags", flag.ExitOnError) | ||
flags.flagSet.DurationVar(&flags.RunDuration, "run-duration", 5*time.Minute, "the duration of time to run the program") | ||
flags.flagSet.StringVar(&flags.Config, "config", "/etc/metrictank/metrictank.ini", "configuration file path") | ||
flags.flagSet.IntVar(&flags.PartitionFrom, "partition-from", 0, "the partition to load the index from") | ||
flags.flagSet.IntVar(&flags.PartitionTo, "partition-to", -1, "load the index from all partitions up to this one (exclusive). If unset, only the partition defined with \"--partition-from\" is loaded from") | ||
flags.flagSet.UintVar(&flags.ReorderWindow, "reorder-window", 1, "the size of the reorder buffer window") | ||
flags.flagSet.StringVar(&flags.Prefix, "prefix", "", "only report metrics with a name that has this prefix") | ||
flags.flagSet.StringVar(&flags.Substr, "substr", "", "only report metrics with a name that has this substring") | ||
flags.flagSet.BoolVar(&flags.GroupByName, "group-by-name", false, "group out-of-order metrics by name") | ||
flags.flagSet.StringVar(&flags.GroupByTag, "group-by-tag", "", "group out-of-order metrics by the specified tag") | ||
|
||
flags.flagSet.Usage = flags.Usage | ||
return &flags | ||
} | ||
|
||
func (flags *Flags) Parse(args []string) { | ||
err := flags.flagSet.Parse(args) | ||
if err != nil { | ||
log.Fatalf("failed to parse application flags %v: %s", args, err.Error()) | ||
os.Exit(1) | ||
} | ||
|
||
path := "" | ||
if _, err := os.Stat(flags.Config); err == nil { | ||
path = flags.Config | ||
} | ||
config, err := globalconf.NewWithOptions(&globalconf.Options{ | ||
Filename: path, | ||
EnvPrefix: "MT_", | ||
}) | ||
if err != nil { | ||
log.Fatalf("error with configuration file: %s", err.Error()) | ||
os.Exit(1) | ||
} | ||
_ = cassandra.ConfigSetup() | ||
inKafkaMdm.ConfigSetup() | ||
config.Parse() | ||
|
||
if flags.GroupByName == false && flags.GroupByTag == "" { | ||
log.Fatalf("must specify at least one of -group-by-name or -group-by-tag") | ||
os.Exit(1) | ||
} | ||
|
||
if flags.ReorderWindow < 1 { | ||
log.Fatalf("-reorder-window must be greater than zero") | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func (flags *Flags) Usage() { | ||
fmt.Fprintln(os.Stderr, "mt-kafka-mdm-report-out-of-order") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, "Inspects what's flowing through kafka (in mdm format) and reports out of order data grouped by metric name or tag, taking into account the reorder buffer)") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, "# Mechanism") | ||
fmt.Fprintln(os.Stderr, "* it sniffs points being added on a per-series (metric Id) level") | ||
fmt.Fprintln(os.Stderr, "* for every series, tracks the last 'correct' point. E.g. a point that was able to be added to the series because its timestamp is higher than any previous timestamp") | ||
fmt.Fprintln(os.Stderr, "* if for any series, a point comes in with a timestamp equal or lower than the last point correct point - which metrictank would not add unless it falls within the reorder buffer - it triggers an event for this out-of-order point") | ||
fmt.Fprintln(os.Stderr, "* the reorder buffer is described by the window size") | ||
fmt.Fprintln(os.Stderr, "Usage:") | ||
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order [flags]") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, "Example output:") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, " total metric points count=2710806") | ||
fmt.Fprintln(os.Stderr, " total out-of-order metric points count=3878") | ||
fmt.Fprintln(os.Stderr, " out-of-order metric points grouped by name:") | ||
fmt.Fprintln(os.Stderr, " out-of-order metric points for name=\"fruit.weight\" count=4 percentGroup=4.301075 percentClass=0.096131 percentTotal=0.000129") | ||
fmt.Fprintln(os.Stderr, " out-of-order metric points for name=\"fruit.height\" count=1 percentGroup=4.545455 percentClass=0.024033 percentTotal=0.000032") | ||
fmt.Fprintln(os.Stderr, " ...") | ||
fmt.Fprintln(os.Stderr, " out-of-order metric points grouped by tag=\"fruit\":") | ||
fmt.Fprintln(os.Stderr, " out-of-order metric points for tag=\"fruit\" value=\"apple\" count=80 percentGroup=5.856515 percentClass=2.062919 percentTotal=0.002951") | ||
fmt.Fprintln(os.Stderr, " out-of-order metric points for tag=\"fruit\" value=\"orange\" count=2912 percentGroup=0.306267 percentClass=75.090253 percentTotal=0.107422") | ||
fmt.Fprintln(os.Stderr, " ...") | ||
fmt.Fprintln(os.Stderr, " total duplicate metric points count=12760") | ||
fmt.Fprintln(os.Stderr, " duplicate metric points grouped by name:") | ||
fmt.Fprintln(os.Stderr, " duplicate metric points for name=\"fruit.width\" count=105 percentGroup=19.266055 percentClass=0.760704 percentTotal=0.003397") | ||
fmt.Fprintln(os.Stderr, " duplicate metric points for name=\"fruit.length\" count=123 percentGroup=15.688776 percentClass=0.891111 percentTotal=0.003979") | ||
fmt.Fprintln(os.Stderr, " ...") | ||
fmt.Fprintln(os.Stderr, " duplicate metric points grouped by tag=\"fruit\":") | ||
fmt.Fprintln(os.Stderr, " duplicate metric points for tag=\"fruit\" value=\"banana\" count=4002 percentGroup=17.201066 percentClass=31.363636 percentTotal=0.147631") | ||
fmt.Fprintln(os.Stderr, " duplicate metric points for tag=\"fruit\" value=\"orange\" count=4796 percentGroup=0.504415 percentClass=37.586207 percentTotal=0.176922") | ||
fmt.Fprintln(os.Stderr, " ...") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, "Fields:") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, " name: the name of the metric (when grouped by name)") | ||
fmt.Fprintln(os.Stderr, " tag: the tag key (when grouped by tag)") | ||
fmt.Fprintln(os.Stderr, " value: the tag value (when grouped by tag)") | ||
fmt.Fprintln(os.Stderr, " count: the number of metric points") | ||
fmt.Fprintln(os.Stderr, " the example above shows that 4002 metric points that had tag \"fruit\"=\"banana\" were duplicates") | ||
fmt.Fprintln(os.Stderr, " percentGroup: the percentage of all of the metric points which had the same name/tag (depending on grouping) that were out of order/duplicates (depending on classification)") | ||
fmt.Fprintln(os.Stderr, " the example above shows that ~4.301% of all metric points with name \"fruit.weight\" were out of order") | ||
fmt.Fprintln(os.Stderr, " percentClass: the percentage of all of the metric points which were out of order/duplicates (depending on classification) that had this name/tag (depending on grouping)") | ||
fmt.Fprintln(os.Stderr, " the example above shows that ~2.063% of all metric points that were out of order had tag fruit=apple") | ||
fmt.Fprintln(os.Stderr, " percentTotal: the percentage of all metric points that had this name/tag (depending on grouping) and were out of order/duplicates (depending on classification)") | ||
fmt.Fprintln(os.Stderr, " the example above shows that ~0.177% of all metric points had tag \"fruit\"=\"orange\" and were duplicates") | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, "flags:") | ||
flags.flagSet.PrintDefaults() | ||
fmt.Fprintln(os.Stderr) | ||
fmt.Fprintln(os.Stderr, "EXAMPLES:") | ||
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -config metrictank.ini -partition-from 0") | ||
fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -group-by-tag namespace -config metrictank.ini -partition-from 0 -partition-to 3 -reorder-window 5 -run-duration 5m") | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package main | ||
|
||
import ( | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/grafana/metrictank/idx/cassandra" | ||
"github.com/grafana/metrictank/mdata" | ||
"github.com/grafana/metrictank/mdata/errors" | ||
"github.com/grafana/metrictank/schema" | ||
"github.com/grafana/metrictank/schema/msg" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
// find out-of-order and duplicate metrics | ||
type inputOOOFinder struct { | ||
reorderWindow uint32 | ||
tracker Tracker | ||
|
||
lock sync.Mutex | ||
} | ||
|
||
func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) *inputOOOFinder { | ||
cassandraIndex := cassandra.New(cassandra.CliConfig) | ||
err := cassandraIndex.InitBare() | ||
if err != nil { | ||
log.Fatalf("error initializing cassandra index: %s", err.Error()) | ||
os.Exit(1) | ||
} | ||
|
||
metricDefinitions := make([]schema.MetricDefinition, 0) | ||
for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ { | ||
metricDefinitions = cassandraIndex.LoadPartitions([]int32{int32(partition)}, metricDefinitions, time.Now()) | ||
} | ||
|
||
tracker := Tracker{} | ||
for _, metricDefinition := range metricDefinitions { | ||
tracker[metricDefinition.Id] = Track{ | ||
Name: metricDefinition.Name, | ||
Tags: metricDefinition.Tags, | ||
reorderBuffer: mdata.NewReorderBuffer(reorderWindow, uint32(metricDefinition.Interval), false), | ||
Count: 0, | ||
OutOfOrderCount: 0, | ||
DuplicateCount: 0, | ||
} | ||
} | ||
|
||
return &inputOOOFinder{ | ||
reorderWindow: reorderWindow, | ||
tracker: tracker, | ||
|
||
lock: sync.Mutex{}, | ||
} | ||
} | ||
|
||
func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) { | ||
metricKey, err := schema.MKeyFromString(metric.Id) | ||
// we loaded all definitions from cassandra at construction time, and any new MetricData in kafka should always be preceeded by the corresponding MetricDefinition (in the same kafka topic), so this should never happen | ||
if err != nil { | ||
log.Errorf("failed to get metric key from id=%v: %s", metric.Id, err.Error()) | ||
return | ||
} | ||
|
||
ip.lock.Lock() | ||
defer ip.lock.Unlock() | ||
|
||
track, exists := ip.tracker[metricKey] | ||
if !exists { | ||
track = Track{ | ||
Name: metric.Name, | ||
Tags: metric.Tags, | ||
reorderBuffer: mdata.NewReorderBuffer(ip.reorderWindow, uint32(metric.Interval), false), | ||
Count: 0, | ||
OutOfOrderCount: 0, | ||
DuplicateCount: 0, | ||
} | ||
} | ||
|
||
ip.incrementCounts(metricKey, metric.Time, track, partition) | ||
} | ||
|
||
func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) { | ||
ip.lock.Lock() | ||
defer ip.lock.Unlock() | ||
|
||
track, exists := ip.tracker[mp.MKey] | ||
if !exists { | ||
log.Errorf("track for metric with key=%v from partition=%d not found", mp.MKey, partition) | ||
return | ||
} | ||
|
||
ip.incrementCounts(mp.MKey, int64(mp.Time), track, partition) | ||
} | ||
|
||
func (ip *inputOOOFinder) ProcessIndexControlMsg(msg schema.ControlMsg, partition int32) { | ||
|
||
} | ||
|
||
func (ip *inputOOOFinder) incrementCounts(metricKey schema.MKey, metricTime int64, track Track, partition int32) { | ||
track.Count++ | ||
|
||
_, _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value | ||
if err == errors.ErrMetricTooOld { | ||
track.OutOfOrderCount++ | ||
} else if err == errors.ErrMetricNewValueForTimestamp { | ||
track.DuplicateCount++ | ||
} else if err != nil { | ||
log.Errorf("failed to add metric with Name=%q and timestamp=%d from partition=%d to reorder buffer: %s", track.Name, metricTime, partition, err) | ||
return | ||
} | ||
|
||
ip.tracker[metricKey] = track | ||
} | ||
|
||
func (ip inputOOOFinder) Tracker() Tracker { | ||
return ip.tracker | ||
} | ||
|
||
type Tracker map[schema.MKey]Track | ||
|
||
type Track struct { | ||
Name string | ||
Tags []string | ||
|
||
reorderBuffer *mdata.ReorderBuffer | ||
|
||
Count int | ||
OutOfOrderCount int | ||
DuplicateCount int | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.