Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename fudge_duplicate_timestamp to be increment_duplicate_timestamp #6120

Merged
merged 2 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2122,15 +2122,16 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -distributor.max-line-size-truncate
[max_line_size_truncate: <boolean> | default = false ]

# Fudge the log line timestamp during ingestion when it's the same as the previous entry for the same stream
# When enabled, if a log line in a push request has the same timestamp as the previous line
# for the same stream, one nanosecond is added to the log line. This will preserve the received
# order of log lines with the exact same timestamp when they are queried by slightly altering
# their stored timestamp. NOTE: this is imperfect because Loki accepts out of order writes
# and another push request for the same stream could contain duplicate timestamps to existing
# entries and they will not be fudged.
# CLI flag: -validation.fudge-duplicate-timestamps
[fudge_duplicate_timestamp: <boolean> | default = false ]
# Alter the log line timestamp during ingestion when the timestamp is the same as the
# previous entry for the same stream. When enabled, if a log line in a push request has
# the same timestamp as the previous line for the same stream, one nanosecond is added
# to the log line. This will preserve the received order of log lines with the exact
# same timestamp when they are queried, by slightly altering their stored timestamp.
# NOTE: This is imperfect, because Loki accepts out of order writes, and another push
# request for the same stream could contain duplicate timestamps to existing
# entries and they will not be incremented.
# CLI flag: -validation.increment-duplicate-timestamps
[increment_duplicate_timestamp: <boolean> | default = false ]

# Maximum number of log entries that will be returned for a query.
# CLI flag: -validation.max-entries-limit
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

stream.Entries[n] = entry

// If configured for this tenant, fudge duplicate timestamps. Note, this is imperfect
// If configured for this tenant, increment duplicate timestamps. Note, this is imperfect
// since Loki will accept out of order writes it doesn't account for separate
// pushes with overlapping time ranges having entries with duplicate timestamps
if validationContext.fudgeDuplicateTimestamps && n != 0 && stream.Entries[n-1].Timestamp.Equal(entry.Timestamp) {
if validationContext.incrementDuplicateTimestamps && n != 0 && stream.Entries[n-1].Timestamp.Equal(entry.Timestamp) {
// Traditional logic for Loki is that 2 lines with the same timestamp and
// exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
// To maintain this behavior, only fudge the timestamp if the log content is different
// To maintain this behavior, only increment the timestamp if the log content is different
if stream.Entries[n-1].Line != entry.Line {
stream.Entries[n].Timestamp = entry.Timestamp.Add(1 * time.Nanosecond)
}
Expand Down
44 changes: 22 additions & 22 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,23 @@ func TestDistributor(t *testing.T) {
}
}

func Test_FudgeTimestamp(t *testing.T) {
fudgingDisabled := &validation.Limits{}
flagext.DefaultValues(fudgingDisabled)
fudgingDisabled.RejectOldSamples = false
func Test_IncrementTimestamp(t *testing.T) {
incrementingDisabled := &validation.Limits{}
flagext.DefaultValues(incrementingDisabled)
incrementingDisabled.RejectOldSamples = false

fudgingEnabled := &validation.Limits{}
flagext.DefaultValues(fudgingEnabled)
fudgingEnabled.RejectOldSamples = false
fudgingEnabled.FudgeDuplicateTimestamp = true
incrementingEnabled := &validation.Limits{}
flagext.DefaultValues(incrementingEnabled)
incrementingEnabled.RejectOldSamples = false
incrementingEnabled.IncrementDuplicateTimestamp = true

tests := map[string]struct {
limits *validation.Limits
push *logproto.PushRequest
expectedPush *logproto.PushRequest
}{
"fudging disabled, no dupes": {
limits: fudgingDisabled,
"incrementing disabled, no dupes": {
limits: incrementingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -140,8 +140,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging disabled, with dupe timestamp different entry": {
limits: fudgingDisabled,
"incrementing disabled, with dupe timestamp different entry": {
limits: incrementingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -165,8 +165,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging disabled, with dupe timestamp same entry": {
limits: fudgingDisabled,
"incrementing disabled, with dupe timestamp same entry": {
limits: incrementingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -190,8 +190,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, no dupes": {
limits: fudgingEnabled,
"incrementing enabled, no dupes": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -215,8 +215,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, with dupe timestamp different entry": {
limits: fudgingEnabled,
"incrementing enabled, with dupe timestamp different entry": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -240,8 +240,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, with dupe timestamp same entry": {
limits: fudgingEnabled,
"incrementing enabled, with dupe timestamp same entry": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -265,8 +265,8 @@ func Test_FudgeTimestamp(t *testing.T) {
},
},
},
"fudging enabled, multiple subsequent fudges": {
limits: fudgingEnabled,
"incrementing enabled, multiple subsequent increments": {
limits: incrementingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ type Limits interface {
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration

FudgeDuplicateTimestamps(userID string) bool
IncrementDuplicateTimestamps(userID string) bool
}
22 changes: 11 additions & 11 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ type validationContext struct {
maxLabelNameLength int
maxLabelValueLength int

fudgeDuplicateTimestamps bool
incrementDuplicateTimestamps bool

userID string
}

func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext {
return validationContext{
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
fudgeDuplicateTimestamps: v.FudgeDuplicateTimestamps(userID),
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
}
}

Expand Down
32 changes: 16 additions & 16 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ const (
// to support user-friendly duration format (e.g: "1h30m45s") in JSON value.
type Limits struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
FudgeDuplicateTimestamp bool `yaml:"fudge_duplicate_timestamp" json:"fudge_duplicate_timestamp"`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`

// Ingester enforced limits.
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
Expand Down Expand Up @@ -136,7 +136,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", true, "Reject old samples.")
f.BoolVar(&l.FudgeDuplicateTimestamp, "validation.fudge-duplicate-timestamps", false, "Fudge the timestamp of a log line by one nanosecond in the future from a previous entry for the same stream with the same timestamp, guarantees sort order at query time.")
f.BoolVar(&l.IncrementDuplicateTimestamp, "validation.increment-duplicate-timestamps", false, "Increment the timestamp of a log line by one nanosecond in the future from a previous entry for the same stream with the same timestamp; guarantees sort order at query time.")

_ = l.RejectOldSamplesMaxAge.Set("7d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
Expand Down Expand Up @@ -539,8 +539,8 @@ func (o *Overrides) PerStreamRateLimit(userID string) RateLimit {
}
}

func (o *Overrides) FudgeDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).FudgeDuplicateTimestamp
func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).IncrementDuplicateTimestamp
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
Expand Down