Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed deprecated untyped record from chunks WAL #3115

Merged
merged 3 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- `-experimental.store-gateway.*` flags renamed to `-store-gateway.*`
- `-experimental.querier.store-gateway-client.*` flags renamed to `-querier.store-gateway-client.*`
- `-experimental.querier.store-gateway-addresses` flag renamed to `-querier.store-gateway-addresses`
* [CHANGE] Ingester: Removed deprecated untyped record from chunks WAL. Only if you are running `v1.0` or below, it is recommended to first upgrade to `v1.1`/`v1.2`/`v1.3` and run it for a day before upgrading to `v1.4` to avoid data loss. #3115
* [CHANGE] Distributor API endpoints are no longer served unless target is set to `distributor` or `all`. #3112
* [CHANGE] Increase the default Cassandra client replication factor to 3. #3007
* [CHANGE] Blocks storage: removed the support to transfer blocks between ingesters on shutdown. When running the Cortex blocks storage, ingesters are expected to run with a persistent disk. The following metrics have been removed: #2996
Expand Down
107 changes: 26 additions & 81 deletions pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ type WAL interface {
type RecordType byte

const (
// Currently we also support the old records without a type header.
// For that, we assume the record type does not cross 7 as the proto unmarshalling
// will produce an error if the first byte is less than 7 (thus we know its not the old record).
// The old record will be removed in the future releases, hence the record type should not cross
// '7' till then.

// WALRecordSeries is the type for the WAL record on Prometheus TSDB record for series.
WALRecordSeries RecordType = 1
// WALRecordSamples is the type for the WAL record based on Prometheus TSDB record for samples.
Expand Down Expand Up @@ -843,7 +837,6 @@ func processWAL(startSegment int, userStates *userStates, params walRecoveryPara

var (
capturedErr error
record = &Record{}
walRecord = &WALRecord{}
lp labelPairs
)
Expand All @@ -857,51 +850,30 @@ Loop:
default:
}

record.Samples = record.Samples[:0]
record.Labels = record.Labels[:0]
// Only one of 'record' or 'walRecord' will have the data.
if err := decodeWALRecord(reader.Record(), record, walRecord); err != nil {
if err := decodeWALRecord(reader.Record(), walRecord); err != nil {
// We don't return here in order to close/drain all the channels and
// make sure all goroutines exit.
capturedErr = err
break Loop
}

if len(record.Labels) > 0 || len(walRecord.Series) > 0 {

var userID string
if len(walRecord.Series) > 0 {
userID = walRecord.UserID
} else {
userID = record.UserId
}
if len(walRecord.Series) > 0 {
userID := walRecord.UserID

state := userStates.getOrCreate(userID)

createSeries := func(fingerprint model.Fingerprint, lbls labelPairs) error {
_, ok := state.fpToSeries.get(fingerprint)
for _, s := range walRecord.Series {
fp := model.Fingerprint(s.Ref)
_, ok := state.fpToSeries.get(fp)
if ok {
return nil
}
_, err := state.createSeriesWithFingerprint(fingerprint, lbls, nil, true)
return err
}

for _, labels := range record.Labels {
if err := createSeries(model.Fingerprint(labels.Fingerprint), labels.Labels); err != nil {
// We don't return here in order to close/drain all the channels and
// make sure all goroutines exit.
capturedErr = err
break Loop
continue
}
}

for _, s := range walRecord.Series {
lp = lp[:0]
for _, l := range s.Labels {
lp = append(lp, client.LabelAdapter(l))
}
if err := createSeries(model.Fingerprint(s.Ref), lp); err != nil {
if _, err := state.createSeriesWithFingerprint(fp, lp, nil, true); err != nil {
// We don't return here in order to close/drain all the channels and
// make sure all goroutines exit.
capturedErr = err
Expand All @@ -914,20 +886,12 @@ Loop:
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(record.Samples) > 0 || len(walRecord.Samples) > 0 {
walRecordSamples := walRecord.Samples
for len(walRecordSamples) > 0 {
m := 5000
var userID string
if len(record.Samples) > 0 {
userID = record.UserId
if len(record.Samples) < m {
m = len(record.Samples)
}
}
if len(walRecord.Samples) > 0 {
userID = walRecord.UserID
if len(walRecord.Samples) < m {
m = len(walRecord.Samples)
}
userID := walRecord.UserID
if len(walRecordSamples) < m {
m = len(walRecordSamples)
}

for i := 0; i < params.numWorkers; i++ {
Expand All @@ -949,21 +913,9 @@ Loop:
}
}

if len(record.Samples) > 0 {
for _, sam := range record.Samples[:m] {
mod := sam.Fingerprint % uint64(params.numWorkers)
shards[mod].samples = append(shards[mod].samples, tsdb_record.RefSample{
Ref: sam.Fingerprint,
T: int64(sam.Timestamp),
V: sam.Value,
})
}
}
if len(walRecord.Samples) > 0 {
for _, sam := range walRecord.Samples[:m] {
mod := sam.Ref % uint64(params.numWorkers)
shards[mod].samples = append(shards[mod].samples, sam)
}
for _, sam := range walRecordSamples[:m] {
mod := sam.Ref % uint64(params.numWorkers)
shards[mod].samples = append(shards[mod].samples, sam)
}

for i := 0; i < params.numWorkers; i++ {
Expand All @@ -972,12 +924,7 @@ Loop:
}
}

if len(record.Samples) > 0 {
record.Samples = record.Samples[m:]
}
if len(walRecord.Samples) > 0 {
walRecord.Samples = walRecord.Samples[m:]
}
walRecordSamples = walRecordSamples[m:]
}
}

Expand Down Expand Up @@ -1171,7 +1118,7 @@ func (record *WALRecord) encodeSamples(b []byte) []byte {
return encoded
}

func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) {
func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
var (
userID string
dec tsdb_record.Decoder
Expand All @@ -1192,23 +1139,21 @@ func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) {
userID = decbuf.UvarintStr()
rseries, err = dec.Series(decbuf.B, walRec.Series)
default:
// The legacy proto record will have it's first byte >7.
// Hence it does not match any of the existing record types.
err = proto.Unmarshal(b, rec)
return err
return errors.New("unknown record type")
}

// We reach here only if its a record with type header.
if decbuf.Err() != nil {
return decbuf.Err()
}

if err == nil {
// There was no error decoding the records with type headers.
walRec.UserID = userID
walRec.Samples = rsamples
walRec.Series = rseries
if err != nil {
return err
}

return err
walRec.UserID = userID
walRec.Samples = rsamples
walRec.Series = rseries

return nil
}
Loading