Skip to content

Commit

Permalink
Removed deprecated untyped record from chunks WAL
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Sep 1, 2020
1 parent 2efff16 commit 69348a8
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 1,171 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [CHANGE] Ingester: Removed deprecated untyped record from chunks WAL. Only if you are running `v1.0` or below, it is recommended to first upgrade to `v1.1`/`v1.2`/`v1.3` and run it for a day before upgrading to `v1.4` to avoid data loss. #3115
* [CHANGE] Increase the default Cassandra client replication factor to 3. #3007
* [CHANGE] Experimental blocks storage: removed the support to transfer blocks between ingesters on shutdown. When running the Cortex blocks storage, ingesters are expected to run with a persistent disk. The following metrics have been removed: #2996
* `cortex_ingester_sent_files`
Expand Down
107 changes: 26 additions & 81 deletions pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ type WAL interface {
type RecordType byte

const (
// Currently we also support the old records without a type header.
// For that, we assume the record type does not cross 7 as the proto unmarshalling
// will produce an error if the first byte is less than 7 (thus we know its not the old record).
// The old record will be removed in the future releases, hence the record type should not cross
// '7' till then.

// WALRecordSeries is the type for the WAL record on Prometheus TSDB record for series.
WALRecordSeries RecordType = 1
// WALRecordSamples is the type for the WAL record based on Prometheus TSDB record for samples.
Expand Down Expand Up @@ -843,7 +837,6 @@ func processWAL(startSegment int, userStates *userStates, params walRecoveryPara

var (
capturedErr error
record = &Record{}
walRecord = &WALRecord{}
lp labelPairs
)
Expand All @@ -857,51 +850,30 @@ Loop:
default:
}

record.Samples = record.Samples[:0]
record.Labels = record.Labels[:0]
// Only one of 'record' or 'walRecord' will have the data.
if err := decodeWALRecord(reader.Record(), record, walRecord); err != nil {
if err := decodeWALRecord(reader.Record(), walRecord); err != nil {
// We don't return here in order to close/drain all the channels and
// make sure all goroutines exit.
capturedErr = err
break Loop
}

if len(record.Labels) > 0 || len(walRecord.Series) > 0 {

var userID string
if len(walRecord.Series) > 0 {
userID = walRecord.UserID
} else {
userID = record.UserId
}
if len(walRecord.Series) > 0 {
userID := walRecord.UserID

state := userStates.getOrCreate(userID)

createSeries := func(fingerprint model.Fingerprint, lbls labelPairs) error {
_, ok := state.fpToSeries.get(fingerprint)
for _, s := range walRecord.Series {
fp := model.Fingerprint(s.Ref)
_, ok := state.fpToSeries.get(fp)
if ok {
return nil
}
_, err := state.createSeriesWithFingerprint(fingerprint, lbls, nil, true)
return err
}

for _, labels := range record.Labels {
if err := createSeries(model.Fingerprint(labels.Fingerprint), labels.Labels); err != nil {
// We don't return here in order to close/drain all the channels and
// make sure all goroutines exit.
capturedErr = err
break Loop
continue
}
}

for _, s := range walRecord.Series {
lp = lp[:0]
for _, l := range s.Labels {
lp = append(lp, client.LabelAdapter(l))
}
if err := createSeries(model.Fingerprint(s.Ref), lp); err != nil {
if _, err := state.createSeriesWithFingerprint(fp, lp, nil, true); err != nil {
// We don't return here in order to close/drain all the channels and
// make sure all goroutines exit.
capturedErr = err
Expand All @@ -914,20 +886,12 @@ Loop:
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(record.Samples) > 0 || len(walRecord.Samples) > 0 {
walRecordSamples := walRecord.Samples
for len(walRecordSamples) > 0 {
m := 5000
var userID string
if len(record.Samples) > 0 {
userID = record.UserId
if len(record.Samples) < m {
m = len(record.Samples)
}
}
if len(walRecord.Samples) > 0 {
userID = walRecord.UserID
if len(walRecord.Samples) < m {
m = len(walRecord.Samples)
}
userID := walRecord.UserID
if len(walRecordSamples) < m {
m = len(walRecordSamples)
}

for i := 0; i < params.numWorkers; i++ {
Expand All @@ -949,21 +913,9 @@ Loop:
}
}

if len(record.Samples) > 0 {
for _, sam := range record.Samples[:m] {
mod := sam.Fingerprint % uint64(params.numWorkers)
shards[mod].samples = append(shards[mod].samples, tsdb_record.RefSample{
Ref: sam.Fingerprint,
T: int64(sam.Timestamp),
V: sam.Value,
})
}
}
if len(walRecord.Samples) > 0 {
for _, sam := range walRecord.Samples[:m] {
mod := sam.Ref % uint64(params.numWorkers)
shards[mod].samples = append(shards[mod].samples, sam)
}
for _, sam := range walRecordSamples[:m] {
mod := sam.Ref % uint64(params.numWorkers)
shards[mod].samples = append(shards[mod].samples, sam)
}

for i := 0; i < params.numWorkers; i++ {
Expand All @@ -972,12 +924,7 @@ Loop:
}
}

if len(record.Samples) > 0 {
record.Samples = record.Samples[m:]
}
if len(walRecord.Samples) > 0 {
walRecord.Samples = walRecord.Samples[m:]
}
walRecordSamples = walRecordSamples[m:]
}
}

Expand Down Expand Up @@ -1171,7 +1118,7 @@ func (record *WALRecord) encodeSamples(b []byte) []byte {
return encoded
}

func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) {
func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
var (
userID string
dec tsdb_record.Decoder
Expand All @@ -1192,23 +1139,21 @@ func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) {
userID = decbuf.UvarintStr()
rseries, err = dec.Series(decbuf.B, walRec.Series)
default:
// The legacy proto record will have it's first byte >7.
// Hence it does not match any of the existing record types.
err = proto.Unmarshal(b, rec)
return err
return errors.New("unknown record type")
}

// We reach here only if its a record with type header.
if decbuf.Err() != nil {
return decbuf.Err()
}

if err == nil {
// There was no error decoding the records with type headers.
walRec.UserID = userID
walRec.Samples = rsamples
walRec.Series = rseries
if err != nil {
return err
}

return err
walRec.UserID = userID
walRec.Samples = rsamples
walRec.Series = rseries

return nil
}
Loading

0 comments on commit 69348a8

Please sign in to comment.