Skip to content

Commit

Permalink
Rename fudge_duplicate_timestamp to be increment_duplicate_timestamp (#…
Browse files Browse the repository at this point in the history
…6120)

* Rename fudge_duplicate_timestamp to be increment_duplicate_timestamp

* run `gofmt -d -w pkg/validation/limits.go`

Co-authored-by: Christian Simon <simon@swine.de>
  • Loading branch information
KMiller-Grafana and simonswine authored May 19, 2022
1 parent 3c33400 commit b90c460
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 62 deletions.
19 changes: 10 additions & 9 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2163,15 +2163,16 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -distributor.max-line-size-truncate
[max_line_size_truncate: <boolean> | default = false ]
# Fudge the log line timestamp during ingestion when it's the same as the previous entry for the same stream
# When enabled, if a log line in a push request has the same timestamp as the previous line
# for the same stream, one nanosecond is added to the log line. This will preserve the received
# order of log lines with the exact same timestamp when they are queried by slightly altering
# their stored timestamp. NOTE: this is imperfect because Loki accepts out of order writes
# and another push request for the same stream could contain duplicate timestamps to existing
# entries and they will not be fudged.
# CLI flag: -validation.fudge-duplicate-timestamps
[fudge_duplicate_timestamp: <boolean> | default = false ]
# Alter the log line timestamp during ingestion when the timestamp is the same as the
# previous entry for the same stream. When enabled, if a log line in a push request has
# the same timestamp as the previous line for the same stream, one nanosecond is added
# to the log line. This will preserve the received order of log lines with the exact
# same timestamp when they are queried, by slightly altering their stored timestamp.
# NOTE: This is imperfect, because Loki accepts out of order writes, and another push
# request for the same stream could contain duplicate timestamps to existing
# entries and they will not be incremented.
# CLI flag: -validation.increment-duplicate-timestamps
[increment_duplicate_timestamp: <boolean> | default = false ]
# Maximum number of log entries that will be returned for a query.
# CLI flag: -validation.max-entries-limit
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

stream.Entries[n] = entry

// If configured for this tenant, fudge duplicate timestamps. Note, this is imperfect
// If configured for this tenant, increment duplicate timestamps. Note, this is imperfect
// since Loki will accept out of order writes it doesn't account for separate
// pushes with overlapping time ranges having entries with duplicate timestamps
if validationContext.fudgeDuplicateTimestamps && n != 0 && stream.Entries[n-1].Timestamp.Equal(entry.Timestamp) {
if validationContext.incrementDuplicateTimestamps && n != 0 && stream.Entries[n-1].Timestamp.Equal(entry.Timestamp) {
// Traditional logic for Loki is that 2 lines with the same timestamp and
// exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
// To maintain this behavior, only fudge the timestamp if the log content is different
// To maintain this behavior, only increment the timestamp if the log content is different
if stream.Entries[n-1].Line != entry.Line {
stream.Entries[n].Timestamp = entry.Timestamp.Add(1 * time.Nanosecond)
}
Expand Down
44 changes: 22 additions & 22 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,23 @@ func TestDistributor(t *testing.T) {
}
}

func Test_FudgeTimestamp(t *testing.T) {
fudgingDisabled := &validation.Limits{}
flagext.DefaultValues(fudgingDisabled)
fudgingDisabled.RejectOldSamples = false
func Test_IncrementTimestamp(t *testing.T) {
incrementingDisabled := &validation.Limits{}
flagext.DefaultValues(incrementingDisabled)
incrementingDisabled.RejectOldSamples = false

fudgingEnabled := &validation.Limits{}
flagext.DefaultValues(fudgingEnabled)
fudgingEnabled.RejectOldSamples = false
fudgingEnabled.FudgeDuplicateTimestamp = true
incrementingEnabled := &validation.Limits{}
flagext.DefaultValues(incrementingEnabled)
incrementingEnabled.RejectOldSamples = false
incrementingEnabled.IncrementDuplicateTimestamp = true

tests := map[string]struct {
limits *validation.Limits
push *logproto.PushRequest
expectedPush *logproto.PushRequest
}{
"fudging disabled, no dupes": {
limits: fudgingDisabled,
"incrementing disabled, no dupes": {
limits: incrementingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -140,8 +140,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging disabled, with dupe timestamp different entry": {
limits: fudgingDisabled,
"incrementing disabled, with dupe timestamp different entry": {
limits: incrementingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -165,8 +165,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging disabled, with dupe timestamp same entry": {
limits: fudgingDisabled,
"incrementing disabled, with dupe timestamp same entry": {
limits: incrementingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -190,8 +190,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, no dupes": {
limits: fudgingEnabled,
"incrementing enabled, no dupes": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -215,8 +215,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, with dupe timestamp different entry": {
limits: fudgingEnabled,
"incrementing enabled, with dupe timestamp different entry": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -240,8 +240,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, with dupe timestamp same entry": {
limits: fudgingEnabled,
"incrementing enabled, with dupe timestamp same entry": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -265,8 +265,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, multiple subsequent fudges": {
limits: fudgingEnabled,
"incrementing enabled, multiple subsequent increments": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ type Limits interface {
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration

FudgeDuplicateTimestamps(userID string) bool
IncrementDuplicateTimestamps(userID string) bool
}
22 changes: 11 additions & 11 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ type validationContext struct {
maxLabelNameLength int
maxLabelValueLength int

fudgeDuplicateTimestamps bool
incrementDuplicateTimestamps bool

userID string
}

func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext {
return validationContext{
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
fudgeDuplicateTimestamps: v.FudgeDuplicateTimestamps(userID),
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
}
}

Expand Down
32 changes: 16 additions & 16 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ const (
// to support user-friendly duration format (e.g: "1h30m45s") in JSON value.
type Limits struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
FudgeDuplicateTimestamp bool `yaml:"fudge_duplicate_timestamp" json:"fudge_duplicate_timestamp"`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`

// Ingester enforced limits.
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
Expand Down Expand Up @@ -136,7 +136,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", true, "Reject old samples.")
f.BoolVar(&l.FudgeDuplicateTimestamp, "validation.fudge-duplicate-timestamps", false, "Fudge the timestamp of a log line by one nanosecond in the future from a previous entry for the same stream with the same timestamp, guarantees sort order at query time.")
f.BoolVar(&l.IncrementDuplicateTimestamp, "validation.increment-duplicate-timestamps", false, "Increment the timestamp of a log line by one nanosecond in the future from a previous entry for the same stream with the same timestamp; guarantees sort order at query time.")

_ = l.RejectOldSamplesMaxAge.Set("7d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
Expand Down Expand Up @@ -539,8 +539,8 @@ func (o *Overrides) PerStreamRateLimit(userID string) RateLimit {
}
}

func (o *Overrides) FudgeDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).FudgeDuplicateTimestamp
func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).IncrementDuplicateTimestamp
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
Expand Down

0 comments on commit b90c460

Please sign in to comment.