Skip to content

Commit

Permalink
Pass logger to BatchWriteClient (opensearch-project#128)
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored Nov 9, 2021
1 parent 41c97f7 commit e0a5fd9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func main() {
pm *agent.PodManager
sm *agent.SystemdManager
targetSources = []agent.TargetSource{}
batcher = agent.NewBatcher(wc)
batcher = agent.NewBatchWriteClient(logger, wc)
)

if flags.Kubernetes {
Expand Down
68 changes: 31 additions & 37 deletions pkg/agent/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,30 @@ import (
)

type Batcher struct {
series []*profilestorepb.RawProfileSeries
writeClient profilestorepb.ProfileStoreServiceClient
logger log.Logger
writeClient profilestorepb.ProfileStoreServiceClient

mtx *sync.RWMutex
series []*profilestorepb.RawProfileSeries

mtx *sync.RWMutex
lastBatchSentAt time.Time
lastBatchSendError error
}

func NewBatcher(wc profilestorepb.ProfileStoreServiceClient) *Batcher {
func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient) *Batcher {
return &Batcher{
series: []*profilestorepb.RawProfileSeries{},
logger: logger,
writeClient: wc,
mtx: &sync.RWMutex{},

series: []*profilestorepb.RawProfileSeries{},
mtx: &sync.RWMutex{},
}
}

func (b *Batcher) loopReport(lastBatchSentAt time.Time, lastBatchSendError error) {
b.mtx.Lock()
defer b.mtx.Unlock()

b.lastBatchSentAt = lastBatchSentAt
b.lastBatchSendError = lastBatchSendError
}
Expand All @@ -64,20 +68,18 @@ func (b *Batcher) Run(ctx context.Context) error {
case <-ticker.C:
}

err := b.batchLoop(ctx)

b.loopReport(time.Now(), err)
b.loopReport(time.Now(), b.batchLoop(ctx))
}
}

func (b *Batcher) batchLoop(ctx context.Context) error {
b.mtx.Lock()
defer b.mtx.Unlock()

_, err := b.writeClient.WriteRaw(ctx,
&profilestorepb.WriteRawRequest{Series: b.series})

if err != nil {
if _, err := b.writeClient.WriteRaw(
ctx,
&profilestorepb.WriteRawRequest{Series: b.series},
); err != nil {
level.Error(b.logger).Log("msg", "Writeclient failed to send profiles", "err", err)
return err
}
Expand All @@ -88,51 +90,43 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
}

func isEqualLabel(a *profilestorepb.LabelSet, b *profilestorepb.LabelSet) bool {
ret := true
if len(a.Labels) != len(b.Labels) {
return false
}

if len(a.Labels) == len(b.Labels) {
for i := range a.Labels {
if (a.Labels[i].Name != b.Labels[i].Name) || (a.Labels[i].Value != b.Labels[i].Value) {
ret = false
}
ret := true
for i := range a.Labels {
if (a.Labels[i].Name != b.Labels[i].Name) || (a.Labels[i].Value != b.Labels[i].Value) {
ret = false
}
} else {
ret = false
}

return ret
}

func ifExists(arr []*profilestorepb.RawProfileSeries, p *profilestorepb.RawProfileSeries) (bool, int) {
res := false

func findIndex(arr []*profilestorepb.RawProfileSeries, p *profilestorepb.RawProfileSeries) (int, bool) {
for i, val := range arr {
if isEqualLabel(val.Labels, p.Labels) {
return true, i
return i, true
}
}
return res, -1
return -1, false
}

func (b *Batcher) WriteRaw(ctx context.Context, r *profilestorepb.WriteRawRequest, opts ...grpc.CallOption) (*profilestorepb.WriteRawResponse, error) {

b.mtx.Lock()
defer b.mtx.Unlock()

for _, profileSeries := range r.Series {
ok, j := ifExists(b.series, profileSeries)

if ok {
if j, ok := findIndex(b.series, profileSeries); ok {
b.series[j].Samples = append(b.series[j].Samples, profileSeries.Samples...)
} else {
b.series = append(b.series, &profilestorepb.RawProfileSeries{
Labels: profileSeries.Labels,
Samples: profileSeries.Samples,
})
continue
}

b.series = append(b.series, &profilestorepb.RawProfileSeries{
Labels: profileSeries.Labels,
Samples: profileSeries.Samples,
})
}

return &profilestorepb.WriteRawResponse{}, nil

}
33 changes: 16 additions & 17 deletions pkg/agent/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,42 @@ import (
"context"
"testing"

"github.com/go-kit/log"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/stretchr/testify/require"
)

func isEqualSample(a []*profilestorepb.RawSample, b []*profilestorepb.RawSample) bool {
ret := true
if len(a) != len(b) {
return false
}

if len(a) == len(b) {
for i := range a {
if !bytes.Equal(a[i].RawProfile, b[i].RawProfile) {
ret = false
}
ret := true
for i := range a {
if !bytes.Equal(a[i].RawProfile, b[i].RawProfile) {
ret = false
}
} else {
ret = false
}

return ret
}

func compareProfileSeries(a []*profilestorepb.RawProfileSeries, b []*profilestorepb.RawProfileSeries) bool {
if len(a) != len(b) {
return false
}

ret := true
if len(a) == len(b) {
for i := range a {
if !isEqualLabel(a[i].Labels, b[i].Labels) || !isEqualSample(a[i].Samples, b[i].Samples) {
ret = false
}
for i := range a {
if !isEqualLabel(a[i].Labels, b[i].Labels) || !isEqualSample(a[i].Samples, b[i].Samples) {
ret = false
}
} else {
ret = false
}
return ret
}

func TestWriteClient(t *testing.T) {
wc := NewNoopProfileStoreClient()
batcher := NewBatcher(wc)
batcher := NewBatchWriteClient(log.NewNopLogger(), wc)

labelset1 := profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{{
Expand Down

0 comments on commit e0a5fd9

Please sign in to comment.