Skip to content

Commit

Permalink
implement InsertProbabilitiesAndQPS and GetLatestProbabilities
Browse files Browse the repository at this point in the history
Signed-off-by: slayer321 <sachin.maurya7666@gmail.com>
  • Loading branch information
slayer321 committed Oct 13, 2023
1 parent 511ca1e commit 5c75b90
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 12 deletions.
102 changes: 90 additions & 12 deletions plugin/storage/badger/samplingstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ type SamplingStore struct {
store *badger.DB
}

type ProbabilitiesAndQPS struct {
Hostname string
Probabilities model.ServiceOperationProbabilities
QPS model.ServiceOperationQPS
}

func NewSamplingStore(db *badger.DB) *SamplingStore {
return &SamplingStore{
store: db,
}
}

func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
fmt.Println("Inside badger samplingstore InsertThroughput")
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createThroughputEntry(throughput, startTime)
Expand All @@ -52,12 +57,9 @@ func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
}
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
// Write the entries
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
fmt.Println("Writing entry to badger")
if err != nil {
// Most likely primary key conflict, but let the caller check this
return err
}
}
Expand All @@ -70,8 +72,6 @@ func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {

func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
var retSlice []*model.Throughput
fmt.Println("Inside badger samplingstore GetThroughput")

prefix := []byte{throughputKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
Expand All @@ -93,7 +93,7 @@ func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput
if err != nil {
return err
}
throughputs, err := decodeValue(val)
throughputs, err := decodeThroughtputValue(val)
if err != nil {
return err
}
Expand All @@ -115,12 +115,83 @@ func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createProbabilitiesEntry(hostname, probabilities, qps, startTime)
if err != nil {
return err
}
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
// Write the entries
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
if err != nil {
return err
}
}

return nil
})

return nil
}

// GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities.
func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
return nil, nil
var retVal model.ServiceOperationProbabilities
var unMarshalProbabilities ProbabilitiesAndQPS
prefix := []byte{probabilitiesKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
val, err := item.ValueCopy(val)
if err != nil {
return err
}
unMarshalProbabilities, err = decodeProbabilitiesValue(val)
retVal = unMarshalProbabilities.Probabilities
}
return nil
})
if err != nil {
return nil, err
}
return retVal, nil
}

func (s *SamplingStore) createProbabilitiesEntry(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) (*badger.Entry, error) {
pK, pV, err := s.createProbabilitiesKV(hostname, probabilities, qps, startTime)
if err != nil {
return nil, err
}

e := s.createBadgerEntry(pK, pV)

return e, nil
}

func (s *SamplingStore) createProbabilitiesKV(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) ([]byte, []byte, error) {
key := make([]byte, 16)
key[0] = probabilitiesKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

var bb []byte
var err error
val := ProbabilitiesAndQPS{
Hostname: hostname,
Probabilities: probabilities,
QPS: qps,
}
bb, err = json.Marshal(val)
return key, bb, err
}

func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) {
Expand Down Expand Up @@ -151,22 +222,29 @@ func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, start
var err error

bb, err = json.Marshal(throughput)
fmt.Printf("Badger key %v, value %v\n", key, string(bb))
return key, bb, err
}

func decodeValue(val []byte) ([]*model.Throughput, error) {
func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) {
var throughput []*model.Throughput

err := json.Unmarshal(val, &throughput)
if err != nil {
fmt.Println("Error while unmarshalling")
return nil, err
}
fmt.Printf("Throughput %v\n", throughput)
return throughput, nil
}

func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) {
var probabilities ProbabilitiesAndQPS

err := json.Unmarshal(val, &probabilities)
if err != nil {
return ProbabilitiesAndQPS{}, err
}
return probabilities, nil
}

func initalStartTime(timeBytes []byte) (time.Time, error) {
var usec int64

Expand Down
21 changes: 21 additions & 0 deletions plugin/storage/badger/samplingstore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ func TestGetThroughput(t *testing.T) {
})
}

func TestInsertProbabilitiesAndQPS(t *testing.T) {
runWithBadger(t, func(s *samplingStoreTest, t *testing.T) {
err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}})
assert.NoError(t, err)
})
}

func TestGetLatestProbabilities(t *testing.T) {
runWithBadger(t, func(s *samplingStoreTest, t *testing.T) {
err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}})
assert.NoError(t, err)
err = s.store.InsertProbabilitiesAndQPS("newhostname", samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}})
assert.NoError(t, err)

expected := samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}
actual, err := s.store.GetLatestProbabilities()
assert.NoError(t, err)
assert.Equal(t, expected, actual)
})
}

func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) {
opts := badger.DefaultOptions("")

Expand Down
1 change: 1 addition & 0 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func (s *StorageIntegration) testGetLatestProbability(t *testing.T) {
}
defer s.cleanUp(t)

s.SamplingStore.InsertProbabilitiesAndQPS("newhostname1", samplemodel.ServiceOperationProbabilities{"new-srv3": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 11}})
s.SamplingStore.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}})

expected := samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}
Expand Down

0 comments on commit 5c75b90

Please sign in to comment.