Skip to content

Commit

Permalink
Add type header to WAL/Checkpoint records and use TSDB records (#2436)
Browse files Browse the repository at this point in the history
* Add type header to WAL and Checkpoint records

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Use Prometheus TSDB records for the WAL

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Dont try to decode old record first

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add a not about downgrade

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome authored Apr 24, 2020
1 parent 37c358b commit 397bfca
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 67 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [CHANGE] Experimental WAL: Default value of `-ingester.checkpoint-enabled` changed to `true`. #2416
* [CHANGE] `trace_id` field in log files has been renamed to `traceID`. #2518
* [CHANGE] Slow query log has a different output now. Previously used `url` field has been replaced with `host` and `path`, and query parameters are logged as individual log fields with `qs_` prefix. #2520
* [CHANGE] Experimental WAL: WAL and checkpoint compression is now disabled. #2436
* [FEATURE] Ruler: The `-ruler.evaluation-delay` flag was added to allow users to configure a default evaluation delay for all rules in cortex. The default value is 0 which is the current behavior. #2423
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [ENHANCEMENT] Failures on samples at distributors and ingesters return the first validation error as opposed to the last. #2383
Expand All @@ -40,6 +41,7 @@
* `cortex_query_frontend_queries_total` (per tenant queries counted by the frontend)
* [ENHANCEMENT] Add de-duplicated chunks counter `cortex_chunk_store_deduped_chunks_total` which counts every chunk not sent to the store because it was already sent by another replica. #2485
* [ENHANCEMENT] query-frontend now also logs the POST data of long queries. #2481
* [ENHANCEMENT] Experimental WAL: Ingester WAL records now have type header and the custom WAL records have been replaced by Prometheus TSDB's WAL records. Old records will not be supported from 1.3 onwards. Note: once this is deployed, you cannot downgrade without data loss. #2436
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
* [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109
Expand Down
23 changes: 12 additions & 11 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -171,7 +172,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c

recordPool = sync.Pool{
New: func() interface{} {
return &Record{}
return &WALRecord{}
},
}
}
Expand Down Expand Up @@ -397,15 +398,15 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
i.pushMetadata(ctx, userID, req.GetMetadata())

var firstPartialErr *validationError
var record *Record
var record *WALRecord
if i.cfg.WALConfig.WALEnabled {
record = recordPool.Get().(*Record)
record.UserId = userID
record = recordPool.Get().(*WALRecord)
record.UserID = userID
// Assuming there is not much churn in most cases, there is no use
// keeping the record.Labels slice hanging around.
record.Labels = nil
record.Series = nil
if cap(record.Samples) < len(req.Timeseries) {
record.Samples = make([]Sample, 0, len(req.Timeseries))
record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries))
} else {
record.Samples = record.Samples[:0]
}
Expand Down Expand Up @@ -450,7 +451,7 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *WALRecord) error {
labels.removeBlanks()

var (
Expand Down Expand Up @@ -506,10 +507,10 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
}

if record != nil {
record.Samples = append(record.Samples, Sample{
Fingerprint: uint64(fp),
Timestamp: uint64(timestamp),
Value: float64(value),
record.Samples = append(record.Samples, tsdb_record.RefSample{
Ref: uint64(fp),
T: int64(timestamp),
V: float64(value),
})
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/segmentio/fasthash/fnv1a"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) {
func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *WALRecord) (*userState, model.Fingerprint, *memorySeries, error) {
state := us.getOrCreate(userID)
// WARNING: `err` may have a reference to unsafe memory in `labels`
fp, series, err := state.getSeries(labels, record)
Expand All @@ -179,7 +180,7 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe

// NOTE: memory for `metric` is unsafe; anything retained beyond the
// life of this function must be copied
func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) {
func (u *userState) getSeries(metric labelPairs, record *WALRecord) (model.Fingerprint, *memorySeries, error) {
rawFP := client.FastFingerprint(metric)
u.fpLocker.Lock(rawFP)
fp := u.mapper.mapFP(rawFP, metric)
Expand All @@ -202,7 +203,7 @@ func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerpr
return fp, series, nil
}

func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record, recovery bool) (*memorySeries, error) {
func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *WALRecord, recovery bool) (*memorySeries, error) {
// There's theoretically a relatively harmless race here if multiple
// goroutines get the length of the series map at the same time, then
// all proceed to add a new series. This is likely not worth addressing,
Expand Down Expand Up @@ -233,9 +234,13 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
u.memSeries.Inc()

if record != nil {
record.Labels = append(record.Labels, Labels{
Fingerprint: uint64(fp),
Labels: metric,
lbls := make(labels.Labels, 0, len(metric))
for _, m := range metric {
lbls = append(lbls, labels.Label(m))
}
record.Series = append(record.Series, tsdb_record.RefSeries{
Ref: uint64(fp),
Labels: lbls,
})
}

Expand Down
Loading

0 comments on commit 397bfca

Please sign in to comment.