diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d59d97c04..a14d603bed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [CHANGE] Ingester: Removed deprecated untyped record from chunks WAL. Only if you are running `v1.0` or below, it is recommended to first upgrade to `v1.1`/`v1.2`/`v1.3` and run it for a day before upgrading to `v1.4` to avoid data loss. #3115 * [CHANGE] Increase the default Cassandra client replication factor to 3. #3007 * [CHANGE] Experimental blocks storage: removed the support to transfer blocks between ingesters on shutdown. When running the Cortex blocks storage, ingesters are expected to run with a persistent disk. The following metrics have been removed: #2996 * `cortex_ingester_sent_files` diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 5657b2eac3..2cdbd913c3 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -65,12 +65,6 @@ type WAL interface { type RecordType byte const ( - // Currently we also support the old records without a type header. - // For that, we assume the record type does not cross 7 as the proto unmarshalling - // will produce an error if the first byte is less than 7 (thus we know its not the old record). - // The old record will be removed in the future releases, hence the record type should not cross - // '7' till then. - // WALRecordSeries is the type for the WAL record on Prometheus TSDB record for series. WALRecordSeries RecordType = 1 // WALRecordSamples is the type for the WAL record based on Prometheus TSDB record for samples. @@ -843,7 +837,6 @@ func processWAL(startSegment int, userStates *userStates, params walRecoveryPara var ( capturedErr error - record = &Record{} walRecord = &WALRecord{} lp labelPairs ) @@ -857,51 +850,30 @@ Loop: default: } - record.Samples = record.Samples[:0] - record.Labels = record.Labels[:0] - // Only one of 'record' or 'walRecord' will have the data. - if err := decodeWALRecord(reader.Record(), record, walRecord); err != nil { + if err := decodeWALRecord(reader.Record(), walRecord); err != nil { // We don't return here in order to close/drain all the channels and // make sure all goroutines exit. capturedErr = err break Loop } - if len(record.Labels) > 0 || len(walRecord.Series) > 0 { - - var userID string - if len(walRecord.Series) > 0 { - userID = walRecord.UserID - } else { - userID = record.UserId - } + if len(walRecord.Series) > 0 { + userID := walRecord.UserID state := userStates.getOrCreate(userID) - createSeries := func(fingerprint model.Fingerprint, lbls labelPairs) error { - _, ok := state.fpToSeries.get(fingerprint) + for _, s := range walRecord.Series { + fp := model.Fingerprint(s.Ref) + _, ok := state.fpToSeries.get(fp) if ok { - return nil - } - _, err := state.createSeriesWithFingerprint(fingerprint, lbls, nil, true) - return err - } - - for _, labels := range record.Labels { - if err := createSeries(model.Fingerprint(labels.Fingerprint), labels.Labels); err != nil { - // We don't return here in order to close/drain all the channels and - // make sure all goroutines exit. - capturedErr = err - break Loop + continue } - } - for _, s := range walRecord.Series { lp = lp[:0] for _, l := range s.Labels { lp = append(lp, client.LabelAdapter(l)) } - if err := createSeries(model.Fingerprint(s.Ref), lp); err != nil { + if _, err := state.createSeriesWithFingerprint(fp, lp, nil, true); err != nil { // We don't return here in order to close/drain all the channels and // make sure all goroutines exit. capturedErr = err @@ -914,20 +886,12 @@ Loop: // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts // of unused memory. - for len(record.Samples) > 0 || len(walRecord.Samples) > 0 { + walRecordSamples := walRecord.Samples + for len(walRecordSamples) > 0 { m := 5000 - var userID string - if len(record.Samples) > 0 { - userID = record.UserId - if len(record.Samples) < m { - m = len(record.Samples) - } - } - if len(walRecord.Samples) > 0 { - userID = walRecord.UserID - if len(walRecord.Samples) < m { - m = len(walRecord.Samples) - } + userID := walRecord.UserID + if len(walRecordSamples) < m { + m = len(walRecordSamples) } for i := 0; i < params.numWorkers; i++ { @@ -949,21 +913,9 @@ Loop: } } - if len(record.Samples) > 0 { - for _, sam := range record.Samples[:m] { - mod := sam.Fingerprint % uint64(params.numWorkers) - shards[mod].samples = append(shards[mod].samples, tsdb_record.RefSample{ - Ref: sam.Fingerprint, - T: int64(sam.Timestamp), - V: sam.Value, - }) - } - } - if len(walRecord.Samples) > 0 { - for _, sam := range walRecord.Samples[:m] { - mod := sam.Ref % uint64(params.numWorkers) - shards[mod].samples = append(shards[mod].samples, sam) - } + for _, sam := range walRecordSamples[:m] { + mod := sam.Ref % uint64(params.numWorkers) + shards[mod].samples = append(shards[mod].samples, sam) } for i := 0; i < params.numWorkers; i++ { @@ -972,12 +924,7 @@ Loop: } } - if len(record.Samples) > 0 { - record.Samples = record.Samples[m:] - } - if len(walRecord.Samples) > 0 { - walRecord.Samples = walRecord.Samples[m:] - } + walRecordSamples = walRecordSamples[m:] } } @@ -1171,7 +1118,7 @@ func (record *WALRecord) encodeSamples(b []byte) []byte { return encoded } -func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) { +func decodeWALRecord(b []byte, walRec *WALRecord) (err error) { var ( userID string dec tsdb_record.Decoder @@ -1192,10 +1139,7 @@ func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) { userID = decbuf.UvarintStr() rseries, err = dec.Series(decbuf.B, walRec.Series) default: - // The legacy proto record will have it's first byte >7. - // Hence it does not match any of the existing record types. - err = proto.Unmarshal(b, rec) - return err + return errors.New("unknown record type") } // We reach here only if its a record with type header. @@ -1203,12 +1147,13 @@ func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) { return decbuf.Err() } - if err == nil { - // There was no error decoding the records with type headers. - walRec.UserID = userID - walRec.Samples = rsamples - walRec.Series = rseries + if err != nil { + return err } - return err + walRec.UserID = userID + walRec.Samples = rsamples + walRec.Series = rseries + + return nil } diff --git a/pkg/ingester/wal.pb.go b/pkg/ingester/wal.pb.go index 31970266ed..719292fdc2 100644 --- a/pkg/ingester/wal.pb.go +++ b/pkg/ingester/wal.pb.go @@ -4,7 +4,6 @@ package ingester import ( - encoding_binary "encoding/binary" fmt "fmt" client "github.com/cortexproject/cortex/pkg/ingester/client" github_com_cortexproject_cortex_pkg_ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" @@ -28,168 +27,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type Record struct { - UserId string `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` - Labels []Labels `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels"` - Samples []Sample `protobuf:"bytes,3,rep,name=samples,proto3" json:"samples"` -} - -func (m *Record) Reset() { *m = Record{} } -func (*Record) ProtoMessage() {} -func (*Record) Descriptor() ([]byte, []int) { - return fileDescriptor_ae6364fc8077884f, []int{0} -} -func (m *Record) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Record.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Record) XXX_Merge(src proto.Message) { - xxx_messageInfo_Record.Merge(m, src) -} -func (m *Record) XXX_Size() int { - return m.Size() -} -func (m *Record) XXX_DiscardUnknown() { - xxx_messageInfo_Record.DiscardUnknown(m) -} - -var xxx_messageInfo_Record proto.InternalMessageInfo - -func (m *Record) GetUserId() string { - if m != nil { - return m.UserId - } - return "" -} - -func (m *Record) GetLabels() []Labels { - if m != nil { - return m.Labels - } - return nil -} - -func (m *Record) GetSamples() []Sample { - if m != nil { - return m.Samples - } - return nil -} - -type Labels struct { - Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` - Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,2,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"` -} - -func (m *Labels) Reset() { *m = Labels{} } -func (*Labels) ProtoMessage() {} -func (*Labels) Descriptor() ([]byte, []int) { - return fileDescriptor_ae6364fc8077884f, []int{1} -} -func (m *Labels) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Labels) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Labels.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Labels) XXX_Merge(src proto.Message) { - xxx_messageInfo_Labels.Merge(m, src) -} -func (m *Labels) XXX_Size() int { - return m.Size() -} -func (m *Labels) XXX_DiscardUnknown() { - xxx_messageInfo_Labels.DiscardUnknown(m) -} - -var xxx_messageInfo_Labels proto.InternalMessageInfo - -func (m *Labels) GetFingerprint() uint64 { - if m != nil { - return m.Fingerprint - } - return 0 -} - -type Sample struct { - Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` - Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Value float64 `protobuf:"fixed64,3,opt,name=value,proto3" json:"value,omitempty"` -} - -func (m *Sample) Reset() { *m = Sample{} } -func (*Sample) ProtoMessage() {} -func (*Sample) Descriptor() ([]byte, []int) { - return fileDescriptor_ae6364fc8077884f, []int{2} -} -func (m *Sample) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Sample) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Sample.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Sample) XXX_Merge(src proto.Message) { - xxx_messageInfo_Sample.Merge(m, src) -} -func (m *Sample) XXX_Size() int { - return m.Size() -} -func (m *Sample) XXX_DiscardUnknown() { - xxx_messageInfo_Sample.DiscardUnknown(m) -} - -var xxx_messageInfo_Sample proto.InternalMessageInfo - -func (m *Sample) GetFingerprint() uint64 { - if m != nil { - return m.Fingerprint - } - return 0 -} - -func (m *Sample) GetTimestamp() uint64 { - if m != nil { - return m.Timestamp - } - return 0 -} - -func (m *Sample) GetValue() float64 { - if m != nil { - return m.Value - } - return 0 -} - type Series struct { UserId string `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` @@ -200,7 +37,7 @@ type Series struct { func (m *Series) Reset() { *m = Series{} } func (*Series) ProtoMessage() {} func (*Series) Descriptor() ([]byte, []int) { - return fileDescriptor_ae6364fc8077884f, []int{3} + return fileDescriptor_ae6364fc8077884f, []int{0} } func (m *Series) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -251,146 +88,35 @@ func (m *Series) GetChunks() []client.Chunk { } func init() { - proto.RegisterType((*Record)(nil), "ingester.Record") - proto.RegisterType((*Labels)(nil), "ingester.Labels") - proto.RegisterType((*Sample)(nil), "ingester.Sample") proto.RegisterType((*Series)(nil), "ingester.Series") } func init() { proto.RegisterFile("wal.proto", fileDescriptor_ae6364fc8077884f) } var fileDescriptor_ae6364fc8077884f = []byte{ - // 415 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x53, 0xcd, 0xca, 0xd3, 0x40, - 0x14, 0xcd, 0x34, 0x75, 0x6a, 0xa7, 0x08, 0x3a, 0x08, 0x86, 0x22, 0xd3, 0x90, 0x55, 0x41, 0x4c, - 0x44, 0xf7, 0xa2, 0x75, 0xa3, 0xe0, 0x42, 0xd2, 0x9d, 0x0b, 0x25, 0x3f, 0xd3, 0x74, 0x6c, 0x92, - 0x09, 0x33, 0x13, 0x75, 0x29, 0xf8, 0x02, 0xbe, 0x81, 0x5b, 0x1f, 0xa5, 0xcb, 0x2e, 0x8b, 0x8b, - 0x62, 0x53, 0x04, 0x97, 0x7d, 0x04, 0xc9, 0x24, 0xd1, 0x52, 0x50, 0x3e, 0xbe, 0xc5, 0xb7, 0xcb, - 0x39, 0xf7, 0xdc, 0x73, 0xcf, 0xdc, 0xcc, 0xa0, 0xe1, 0x87, 0x20, 0x75, 0x0b, 0xc1, 0x15, 0xc7, - 0xd7, 0x59, 0x9e, 0x50, 0xa9, 0xa8, 0x18, 0xdf, 0x4f, 0x98, 0x5a, 0x96, 0xa1, 0x1b, 0xf1, 0xcc, - 0x4b, 0x78, 0xc2, 0x3d, 0x2d, 0x08, 0xcb, 0x85, 0x46, 0x1a, 0xe8, 0xaf, 0xa6, 0x71, 0xfc, 0xe4, - 0x44, 0x1e, 0x71, 0xa1, 0xe8, 0xc7, 0x42, 0xf0, 0x77, 0x34, 0x52, 0x2d, 0xf2, 0x8a, 0x55, 0xe2, - 0x75, 0xe6, 0x5e, 0x94, 0x32, 0x9a, 0x77, 0xa5, 0xc6, 0xc1, 0xf9, 0x0c, 0x10, 0xf4, 0x69, 0xc4, - 0x45, 0x8c, 0xef, 0xa0, 0x41, 0x29, 0xa9, 0x78, 0xcb, 0x62, 0x0b, 0xd8, 0x60, 0x3a, 0xf4, 0x61, - 0x0d, 0x5f, 0xc4, 0xd8, 0x45, 0x30, 0x0d, 0x42, 0x9a, 0x4a, 0xab, 0x67, 0x9b, 0xd3, 0xd1, 0xc3, - 0x9b, 0x6e, 0x67, 0xe9, 0xbe, 0xd4, 0xfc, 0xac, 0xbf, 0xde, 0x4d, 0x0c, 0xbf, 0x55, 0xe1, 0x07, - 0x68, 0x20, 0x83, 0xac, 0x48, 0xa9, 0xb4, 0xcc, 0xf3, 0x86, 0xb9, 0x2e, 0xb4, 0x0d, 0x9d, 0xcc, - 0xf9, 0x0a, 0x10, 0x6c, 0xac, 0xb0, 0x8d, 0x46, 0x8b, 0x5a, 0x2d, 0x0a, 0xc1, 0x72, 0xa5, 0x93, - 0xf4, 0xfd, 0x53, 0x0a, 0xcb, 0xb3, 0x38, 0xb7, 0xdc, 0xf6, 0x44, 0xda, 0xe1, 0x55, 0xc0, 0xc4, - 0xec, 0x79, 0x6d, 0xff, 0x7d, 0x37, 0xb9, 0xcc, 0x7e, 0x1a, 0x9b, 0xa7, 0x71, 0x50, 0x28, 0x2a, - 0xba, 0x33, 0x39, 0x6f, 0x10, 0x6c, 0xa2, 0x5f, 0x20, 0xe0, 0x5d, 0x34, 0x54, 0x2c, 0xa3, 0x52, - 0x05, 0x59, 0x61, 0xf5, 0x74, 0xfd, 0x2f, 0x81, 0x6f, 0xa3, 0x6b, 0xef, 0x83, 0xb4, 0xa4, 0x96, - 0x69, 0x83, 0x29, 0xf0, 0x1b, 0xe0, 0xfc, 0x04, 0x08, 0xce, 0xa9, 0x60, 0x54, 0xfe, 0xfb, 0x3f, - 0x9c, 0x4d, 0xee, 0xfd, 0x6f, 0x35, 0xe6, 0x95, 0xad, 0x06, 0xdf, 0x43, 0x30, 0x5a, 0x96, 0xf9, - 0x4a, 0x5a, 0x7d, 0x3d, 0xf4, 0x46, 0x37, 0xf4, 0x59, 0xcd, 0x76, 0x77, 0xa3, 0x91, 0xcc, 0x1e, - 0x6f, 0xf6, 0xc4, 0xd8, 0xee, 0x89, 0x71, 0xdc, 0x13, 0xf0, 0xa9, 0x22, 0xe0, 0x5b, 0x45, 0xc0, - 0xba, 0x22, 0x60, 0x53, 0x11, 0xf0, 0xa3, 0x22, 0xe0, 0x57, 0x45, 0x8c, 0x63, 0x45, 0xc0, 0x97, - 0x03, 0x31, 0x36, 0x07, 0x62, 0x6c, 0x0f, 0xc4, 0x78, 0xfd, 0xe7, 0x81, 0x84, 0x50, 0x5f, 0xdb, - 0x47, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x83, 0xd9, 0xb8, 0x9a, 0x3e, 0x03, 0x00, 0x00, -} - -func (this *Record) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*Record) - if !ok { - that2, ok := that.(Record) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if this.UserId != that1.UserId { - return false - } - if len(this.Labels) != len(that1.Labels) { - return false - } - for i := range this.Labels { - if !this.Labels[i].Equal(&that1.Labels[i]) { - return false - } - } - if len(this.Samples) != len(that1.Samples) { - return false - } - for i := range this.Samples { - if !this.Samples[i].Equal(&that1.Samples[i]) { - return false - } - } - return true -} -func (this *Labels) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - - that1, ok := that.(*Labels) - if !ok { - that2, ok := that.(Labels) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if this.Fingerprint != that1.Fingerprint { - return false - } - if len(this.Labels) != len(that1.Labels) { - return false - } - for i := range this.Labels { - if !this.Labels[i].Equal(that1.Labels[i]) { - return false - } - } - return true + // 312 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x31, 0x4e, 0xc3, 0x30, + 0x18, 0x85, 0x6d, 0x5a, 0x05, 0xea, 0x8a, 0x81, 0x2c, 0x44, 0x1d, 0xfe, 0x46, 0x4c, 0x95, 0x10, + 0x8d, 0x04, 0x3b, 0x82, 0xb2, 0x80, 0xc4, 0x80, 0xc2, 0xc6, 0x82, 0x92, 0xd4, 0x4d, 0x4d, 0x43, + 0x1c, 0xd9, 0x8e, 0x60, 0xe4, 0x08, 0x1c, 0x83, 0xa3, 0x74, 0xec, 0x58, 0x31, 0x54, 0xd4, 0x11, + 0x12, 0x63, 0x8f, 0x80, 0xe2, 0x24, 0xa8, 0x33, 0x9b, 0xdf, 0xf3, 0xf3, 0xfb, 0xac, 0xff, 0x27, + 0x9d, 0x97, 0x20, 0x19, 0x66, 0x82, 0x2b, 0x6e, 0xef, 0xb1, 0x34, 0xa6, 0x52, 0x51, 0xd1, 0x3b, + 0x89, 0x99, 0x9a, 0xe6, 0xe1, 0x30, 0xe2, 0xcf, 0x5e, 0xcc, 0x63, 0xee, 0x99, 0x40, 0x98, 0x4f, + 0x8c, 0x32, 0xc2, 0x9c, 0xaa, 0x87, 0xbd, 0x8b, 0xad, 0x78, 0xc4, 0x85, 0xa2, 0xaf, 0x99, 0xe0, + 0x4f, 0x34, 0x52, 0xb5, 0xf2, 0xb2, 0x59, 0xec, 0x35, 0xe5, 0x5e, 0x94, 0x30, 0x9a, 0x36, 0x57, + 0x55, 0xc3, 0xd1, 0x37, 0x26, 0xd6, 0x3d, 0x15, 0x8c, 0x4a, 0xfb, 0x90, 0xec, 0xe6, 0x92, 0x8a, + 0x47, 0x36, 0x76, 0xb0, 0x8b, 0x07, 0x1d, 0xdf, 0x2a, 0xe5, 0xcd, 0xd8, 0x76, 0x49, 0x77, 0x52, + 0x96, 0x88, 0x4c, 0xb0, 0x54, 0x39, 0x3b, 0x2e, 0x1e, 0xb4, 0xfd, 0x6d, 0xcb, 0x96, 0xc4, 0x4a, + 0x82, 0x90, 0x26, 0xd2, 0x69, 0xb9, 0xad, 0x41, 0xf7, 0xf4, 0x60, 0x58, 0x43, 0x6e, 0x4b, 0xf7, + 0x2e, 0x60, 0x62, 0x74, 0x3d, 0x5f, 0xf5, 0xd1, 0xe7, 0xaa, 0xff, 0x9f, 0x2f, 0x57, 0x35, 0x97, + 0xe3, 0x20, 0x53, 0x54, 0xf8, 0x35, 0xca, 0x3e, 0x26, 0x56, 0x34, 0xcd, 0xd3, 0x99, 0x74, 0xda, + 0x06, 0xba, 0xdf, 0x40, 0xaf, 0x4a, 0x77, 0xd4, 0x2e, 0x81, 0x7e, 0x1d, 0x19, 0x9d, 0x2f, 0xd6, + 0x80, 0x96, 0x6b, 0x40, 0x9b, 0x35, 0xe0, 0x37, 0x0d, 0xf8, 0x43, 0x03, 0x9e, 0x6b, 0xc0, 0x0b, + 0x0d, 0xf8, 0x4b, 0x03, 0xfe, 0xd1, 0x80, 0x36, 0x1a, 0xf0, 0x7b, 0x01, 0x68, 0x51, 0x00, 0x5a, + 0x16, 0x80, 0x1e, 0xfe, 0x16, 0x13, 0x5a, 0x66, 0x5c, 0x67, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, + 0x6c, 0xbf, 0xb4, 0x95, 0xb6, 0x01, 0x00, 0x00, } -func (this *Sample) Equal(that interface{}) bool { - if that == nil { - return this == nil - } - that1, ok := that.(*Sample) - if !ok { - that2, ok := that.(Sample) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - return this == nil - } else if this == nil { - return false - } - if this.Fingerprint != that1.Fingerprint { - return false - } - if this.Timestamp != that1.Timestamp { - return false - } - if this.Value != that1.Value { - return false - } - return true -} func (this *Series) Equal(that interface{}) bool { if that == nil { return this == nil @@ -434,53 +160,6 @@ func (this *Series) Equal(that interface{}) bool { } return true } -func (this *Record) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 7) - s = append(s, "&ingester.Record{") - s = append(s, "UserId: "+fmt.Sprintf("%#v", this.UserId)+",\n") - if this.Labels != nil { - vs := make([]*Labels, len(this.Labels)) - for i := range vs { - vs[i] = &this.Labels[i] - } - s = append(s, "Labels: "+fmt.Sprintf("%#v", vs)+",\n") - } - if this.Samples != nil { - vs := make([]*Sample, len(this.Samples)) - for i := range vs { - vs[i] = &this.Samples[i] - } - s = append(s, "Samples: "+fmt.Sprintf("%#v", vs)+",\n") - } - s = append(s, "}") - return strings.Join(s, "") -} -func (this *Labels) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 6) - s = append(s, "&ingester.Labels{") - s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") - s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} -func (this *Sample) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 7) - s = append(s, "&ingester.Sample{") - s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") - s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n") - s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} func (this *Series) GoString() string { if this == nil { return "nil" @@ -508,7 +187,7 @@ func valueToGoStringWal(v interface{}, typ string) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) } -func (m *Record) Marshal() (dAtA []byte, err error) { +func (m *Series) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -518,20 +197,20 @@ func (m *Record) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *Record) MarshalTo(dAtA []byte) (int, error) { +func (m *Series) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Samples) > 0 { - for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { + if len(m.Chunks) > 0 { + for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -539,23 +218,28 @@ func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintWal(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x1a + dAtA[i] = 0x22 } } if len(m.Labels) > 0 { for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.Labels[iNdEx].Size() + i -= size + if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size i = encodeVarintWal(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x12 + dAtA[i] = 0x1a } } + if m.Fingerprint != 0 { + i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) + i-- + dAtA[i] = 0x10 + } if len(m.UserId) > 0 { i -= len(m.UserId) copy(dAtA[i:], m.UserId) @@ -566,222 +250,17 @@ func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *Labels) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err +func encodeVarintWal(dAtA []byte, offset int, v uint64) int { + offset -= sovWal(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ } - return dAtA[:n], nil -} - -func (m *Labels) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) + dAtA[offset] = uint8(v) + return base } - -func (m *Labels) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Labels) > 0 { - for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { - { - size := m.Labels[iNdEx].Size() - i -= size - if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { - return 0, err - } - i = encodeVarintWal(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if m.Fingerprint != 0 { - i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) - i-- - dAtA[i] = 0x8 - } - return len(dAtA) - i, nil -} - -func (m *Sample) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Sample) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Value != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) - i-- - dAtA[i] = 0x19 - } - if m.Timestamp != 0 { - i = encodeVarintWal(dAtA, i, uint64(m.Timestamp)) - i-- - dAtA[i] = 0x10 - } - if m.Fingerprint != 0 { - i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) - i-- - dAtA[i] = 0x8 - } - return len(dAtA) - i, nil -} - -func (m *Series) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Series) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Chunks) > 0 { - for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintWal(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x22 - } - } - if len(m.Labels) > 0 { - for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { - { - size := m.Labels[iNdEx].Size() - i -= size - if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { - return 0, err - } - i = encodeVarintWal(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x1a - } - } - if m.Fingerprint != 0 { - i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) - i-- - dAtA[i] = 0x10 - } - if len(m.UserId) > 0 { - i -= len(m.UserId) - copy(dAtA[i:], m.UserId) - i = encodeVarintWal(dAtA, i, uint64(len(m.UserId))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func encodeVarintWal(dAtA []byte, offset int, v uint64) int { - offset -= sovWal(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *Record) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.UserId) - if l > 0 { - n += 1 + l + sovWal(uint64(l)) - } - if len(m.Labels) > 0 { - for _, e := range m.Labels { - l = e.Size() - n += 1 + l + sovWal(uint64(l)) - } - } - if len(m.Samples) > 0 { - for _, e := range m.Samples { - l = e.Size() - n += 1 + l + sovWal(uint64(l)) - } - } - return n -} - -func (m *Labels) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Fingerprint != 0 { - n += 1 + sovWal(uint64(m.Fingerprint)) - } - if len(m.Labels) > 0 { - for _, e := range m.Labels { - l = e.Size() - n += 1 + l + sovWal(uint64(l)) - } - } - return n -} - -func (m *Sample) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Fingerprint != 0 { - n += 1 + sovWal(uint64(m.Fingerprint)) - } - if m.Timestamp != 0 { - n += 1 + sovWal(uint64(m.Timestamp)) - } - if m.Value != 0 { - n += 9 - } - return n -} - func (m *Series) Size() (n int) { if m == nil { return 0 @@ -816,51 +295,6 @@ func sovWal(x uint64) (n int) { func sozWal(x uint64) (n int) { return sovWal(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (this *Record) String() string { - if this == nil { - return "nil" - } - repeatedStringForLabels := "[]Labels{" - for _, f := range this.Labels { - repeatedStringForLabels += strings.Replace(strings.Replace(f.String(), "Labels", "Labels", 1), `&`, ``, 1) + "," - } - repeatedStringForLabels += "}" - repeatedStringForSamples := "[]Sample{" - for _, f := range this.Samples { - repeatedStringForSamples += strings.Replace(strings.Replace(f.String(), "Sample", "Sample", 1), `&`, ``, 1) + "," - } - repeatedStringForSamples += "}" - s := strings.Join([]string{`&Record{`, - `UserId:` + fmt.Sprintf("%v", this.UserId) + `,`, - `Labels:` + repeatedStringForLabels + `,`, - `Samples:` + repeatedStringForSamples + `,`, - `}`, - }, "") - return s -} -func (this *Labels) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&Labels{`, - `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, - `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, - `}`, - }, "") - return s -} -func (this *Sample) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&Sample{`, - `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, - `Timestamp:` + fmt.Sprintf("%v", this.Timestamp) + `,`, - `Value:` + fmt.Sprintf("%v", this.Value) + `,`, - `}`, - }, "") - return s -} func (this *Series) String() string { if this == nil { return "nil" @@ -887,367 +321,6 @@ func valueToStringWal(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } -func (m *Record) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Record: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Record: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field UserId", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthWal - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthWal - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.UserId = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthWal - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthWal - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Labels = append(m.Labels, Labels{}) - if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthWal - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthWal - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Samples = append(m.Samples, Sample{}) - if err := m.Samples[len(m.Samples)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipWal(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthWal - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthWal - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Labels) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Labels: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Labels: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) - } - m.Fingerprint = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Fingerprint |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthWal - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthWal - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Labels = append(m.Labels, github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter{}) - if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipWal(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthWal - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthWal - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Sample) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Sample: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Sample: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) - } - m.Fingerprint = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Fingerprint |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) - } - m.Timestamp = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowWal - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Timestamp |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Value = float64(math.Float64frombits(v)) - default: - iNdEx = preIndex - skippy, err := skipWal(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthWal - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthWal - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *Series) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/ingester/wal.proto b/pkg/ingester/wal.proto index 25b5361dde..1b095aa597 100644 --- a/pkg/ingester/wal.proto +++ b/pkg/ingester/wal.proto @@ -7,23 +7,6 @@ option go_package = "ingester"; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; import "github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto"; -message Record { - string user_id = 1; - repeated Labels labels = 2 [(gogoproto.nullable) = false]; - repeated Sample samples = 3 [(gogoproto.nullable) = false]; -} - -message Labels { - uint64 fingerprint = 1; - repeated cortex.LabelPair labels = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; -} - -message Sample { - uint64 fingerprint = 1; - uint64 timestamp = 2; - double value = 3; -} - message Series { string user_id = 1; uint64 fingerprint = 2; diff --git a/pkg/ingester/wal_test.go b/pkg/ingester/wal_test.go index 0af1545421..b391440b7a 100644 --- a/pkg/ingester/wal_test.go +++ b/pkg/ingester/wal_test.go @@ -10,11 +10,9 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - tsdb_record "github.com/prometheus/prometheus/tsdb/record" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -225,103 +223,6 @@ func TestCheckpointRepair(t *testing.T) { } -func TestMigrationToTypedRecord(t *testing.T) { - // WAL record migration. - walRecordOld := &Record{ - UserId: "12345", - Labels: []Labels{ - {Fingerprint: 7568176523, Labels: []client.LabelAdapter{{Name: "n1", Value: "v1"}}}, - {Fingerprint: 5720984283, Labels: []client.LabelAdapter{{Name: "n2", Value: "v2"}}}, - }, - Samples: []Sample{ - {Fingerprint: 768276312, Timestamp: 10, Value: 10}, - {Fingerprint: 326847234, Timestamp: 99, Value: 99}, - }, - } - walRecordNew := &WALRecord{ - UserID: "12345", - Series: []tsdb_record.RefSeries{ - {Ref: 7568176523, Labels: []labels.Label{{Name: "n1", Value: "v1"}}}, - {Ref: 5720984283, Labels: []labels.Label{{Name: "n2", Value: "v2"}}}, - }, - Samples: []tsdb_record.RefSample{ - {Ref: 768276312, T: 10, V: 10}, - {Ref: 326847234, T: 99, V: 99}, - }, - } - - // Encoding old record. - oldRecordBytes, err := proto.Marshal(walRecordOld) - require.NoError(t, err) - // Series and samples are encoded separately in the new record. - newRecordSeriesBytes := walRecordNew.encodeSeries(nil) - newRecordSamples := walRecordNew.encodeSamples(nil) - - // Test decoding of old record. - record, walRecord := &Record{}, &WALRecord{} - err = decodeWALRecord(oldRecordBytes, record, walRecord) - require.NoError(t, err) - require.Equal(t, walRecordOld, record) - require.Equal(t, &WALRecord{}, walRecord) - - // Test series and samples of new record separately. - record, walRecord = &Record{}, &WALRecord{} - err = decodeWALRecord(newRecordSeriesBytes, record, walRecord) - require.NoError(t, err) - require.Equal(t, &Record{}, record) - require.Equal(t, walRecordNew.UserID, walRecord.UserID) - require.Equal(t, walRecordNew.Series, walRecord.Series) - require.Equal(t, 0, len(walRecord.Samples)) - - record, walRecord = &Record{}, &WALRecord{} - err = decodeWALRecord(newRecordSamples, record, walRecord) - require.NoError(t, err) - require.Equal(t, &Record{}, record) - require.Equal(t, walRecordNew.UserID, walRecord.UserID) - require.Equal(t, walRecordNew.Samples, walRecord.Samples) - require.Equal(t, 0, len(walRecord.Series)) - - // Checkpoint record migration. - checkpointRecord := &Series{ - UserId: "12345", - Fingerprint: 3479837401, - Labels: []client.LabelAdapter{ - {Name: "n1", Value: "v1"}, - {Name: "n2", Value: "v2"}, - }, - Chunks: []client.Chunk{ - { - StartTimestampMs: 12345, - EndTimestampMs: 23456, - Encoding: 3, - Data: []byte{3, 3, 65, 23, 66}, - }, - { - StartTimestampMs: 34567, - EndTimestampMs: 45678, - Encoding: 2, - Data: []byte{11, 22, 33, 44, 55, 66, 77, 88}, - }, - }, - } - - oldRecordBytes, err = proto.Marshal(checkpointRecord) - require.NoError(t, err) - newRecordBytes, err := encodeWithTypeHeader(checkpointRecord, CheckpointRecord, nil) - require.NoError(t, err) - - m, err := decodeCheckpointRecord(oldRecordBytes, &Series{}) - require.NoError(t, err) - oldCheckpointRecordDecoded := m.(*Series) - - m, err = decodeCheckpointRecord(newRecordBytes, &Series{}) - require.NoError(t, err) - newCheckpointRecordDecoded := m.(*Series) - - require.Equal(t, checkpointRecord, oldCheckpointRecordDecoded) - require.Equal(t, checkpointRecord, newCheckpointRecordDecoded) -} - func TestCheckpointIndex(t *testing.T) { tcs := []struct { filename string