diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go index 0d47bd3a25c..c7823ff05b8 100644 --- a/plugin/storage/badger/samplingstore/storage.go +++ b/plugin/storage/badger/samplingstore/storage.go @@ -36,6 +36,12 @@ type SamplingStore struct { store *badger.DB } +type ProbabilitiesAndQPS struct { + Hostname string + Probabilities model.ServiceOperationProbabilities + QPS model.ServiceOperationQPS +} + func NewSamplingStore(db *badger.DB) *SamplingStore { return &SamplingStore{ store: db, @@ -43,7 +49,6 @@ func NewSamplingStore(db *badger.DB) *SamplingStore { } func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { - fmt.Println("Inside badger samplingstore InsertThroughput") startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) entriesToStore := make([]*badger.Entry, 0) entries, err := s.createThroughputEntry(throughput, startTime) @@ -52,12 +57,9 @@ func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { } entriesToStore = append(entriesToStore, entries) err = s.store.Update(func(txn *badger.Txn) error { - // Write the entries for i := range entriesToStore { err = txn.SetEntry(entriesToStore[i]) - fmt.Println("Writing entry to badger") if err != nil { - // Most likely primary key conflict, but let the caller check this return err } } @@ -70,8 +72,6 @@ func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) { var retSlice []*model.Throughput - fmt.Println("Inside badger samplingstore GetThroughput") - prefix := []byte{throughputKeyPrefix} err := s.store.View(func(txn *badger.Txn) error { @@ -93,7 +93,7 @@ func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput if err != nil { return err } - throughputs, err := decodeValue(val) + throughputs, err := decodeThroughtputValue(val) if err != nil { return err } @@ -115,12 +115,83 @@ func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, ) error { + startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) + entriesToStore := make([]*badger.Entry, 0) + entries, err := s.createProbabilitiesEntry(hostname, probabilities, qps, startTime) + if err != nil { + return err + } + entriesToStore = append(entriesToStore, entries) + err = s.store.Update(func(txn *badger.Txn) error { + // Write the entries + for i := range entriesToStore { + err = txn.SetEntry(entriesToStore[i]) + if err != nil { + return err + } + } + + return nil + }) + return nil } // GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities. func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) { - return nil, nil + var retVal model.ServiceOperationProbabilities + var unMarshalProbabilities ProbabilitiesAndQPS + prefix := []byte{probabilitiesKeyPrefix} + + err := s.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + val := []byte{} + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + val, err := item.ValueCopy(val) + if err != nil { + return err + } + unMarshalProbabilities, err = decodeProbabilitiesValue(val) + retVal = unMarshalProbabilities.Probabilities + } + return nil + }) + if err != nil { + return nil, err + } + return retVal, nil +} + +func (s *SamplingStore) createProbabilitiesEntry(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) (*badger.Entry, error) { + pK, pV, err := s.createProbabilitiesKV(hostname, probabilities, qps, startTime) + if err != nil { + return nil, err + } + + e := s.createBadgerEntry(pK, pV) + + return e, nil +} + +func (s *SamplingStore) createProbabilitiesKV(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) ([]byte, []byte, error) { + key := make([]byte, 16) + key[0] = probabilitiesKeyPrefix + pos := 1 + binary.BigEndian.PutUint64(key[pos:], startTime) + + var bb []byte + var err error + val := ProbabilitiesAndQPS{ + Hostname: hostname, + Probabilities: probabilities, + QPS: qps, + } + bb, err = json.Marshal(val) + return key, bb, err } func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) { @@ -151,22 +222,29 @@ func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, start var err error bb, err = json.Marshal(throughput) - fmt.Printf("Badger key %v, value %v\n", key, string(bb)) return key, bb, err } -func decodeValue(val []byte) ([]*model.Throughput, error) { +func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) { var throughput []*model.Throughput err := json.Unmarshal(val, &throughput) if err != nil { - fmt.Println("Error while unmarshalling") return nil, err } - fmt.Printf("Throughput %v\n", throughput) return throughput, nil } +func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) { + var probabilities ProbabilitiesAndQPS + + err := json.Unmarshal(val, &probabilities) + if err != nil { + return ProbabilitiesAndQPS{}, err + } + return probabilities, nil +} + func initalStartTime(timeBytes []byte) (time.Time, error) { var usec int64 diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go index f9042edf295..d11e5524d72 100644 --- a/plugin/storage/badger/samplingstore/storage_test.go +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -63,6 +63,27 @@ func TestGetThroughput(t *testing.T) { }) } +func TestInsertProbabilitiesAndQPS(t *testing.T) { + runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) + assert.NoError(t, err) + }) +} + +func TestGetLatestProbabilities(t *testing.T) { + runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) + assert.NoError(t, err) + err = s.store.InsertProbabilitiesAndQPS("newhostname", samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}}) + assert.NoError(t, err) + + expected := samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}} + actual, err := s.store.GetLatestProbabilities() + assert.NoError(t, err) + assert.Equal(t, expected, actual) + }) +} + func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) { opts := badger.DefaultOptions("") diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index afb743506c9..9f1cf163376 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -452,6 +452,7 @@ func (s *StorageIntegration) testGetLatestProbability(t *testing.T) { } defer s.cleanUp(t) + s.SamplingStore.InsertProbabilitiesAndQPS("newhostname1", samplemodel.ServiceOperationProbabilities{"new-srv3": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 11}}) s.SamplingStore.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) expected := samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}