From e0a5fd9cae64b645ba6353ac21ad467364d0406f Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Tue, 9 Nov 2021 12:58:20 +0100 Subject: [PATCH] Pass logger to BatchWriteClient (#128) Signed-off-by: Kemal Akkoyun --- cmd/parca-agent/main.go | 2 +- pkg/agent/batcher.go | 68 ++++++++++++++++++--------------------- pkg/agent/batcher_test.go | 33 +++++++++---------- 3 files changed, 48 insertions(+), 55 deletions(-) diff --git a/cmd/parca-agent/main.go b/cmd/parca-agent/main.go index dc5c88864..6a3a5ef35 100644 --- a/cmd/parca-agent/main.go +++ b/cmd/parca-agent/main.go @@ -125,7 +125,7 @@ func main() { pm *agent.PodManager sm *agent.SystemdManager targetSources = []agent.TargetSource{} - batcher = agent.NewBatcher(wc) + batcher = agent.NewBatchWriteClient(logger, wc) ) if flags.Kubernetes { diff --git a/pkg/agent/batcher.go b/pkg/agent/batcher.go index e6e02b1a2..355a86749 100644 --- a/pkg/agent/batcher.go +++ b/pkg/agent/batcher.go @@ -26,26 +26,30 @@ import ( ) type Batcher struct { - series []*profilestorepb.RawProfileSeries - writeClient profilestorepb.ProfileStoreServiceClient logger log.Logger + writeClient profilestorepb.ProfileStoreServiceClient + + mtx *sync.RWMutex + series []*profilestorepb.RawProfileSeries - mtx *sync.RWMutex lastBatchSentAt time.Time lastBatchSendError error } -func NewBatcher(wc profilestorepb.ProfileStoreServiceClient) *Batcher { +func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient) *Batcher { return &Batcher{ - series: []*profilestorepb.RawProfileSeries{}, + logger: logger, writeClient: wc, - mtx: &sync.RWMutex{}, + + series: []*profilestorepb.RawProfileSeries{}, + mtx: &sync.RWMutex{}, } } func (b *Batcher) loopReport(lastBatchSentAt time.Time, lastBatchSendError error) { b.mtx.Lock() defer b.mtx.Unlock() + b.lastBatchSentAt = lastBatchSentAt b.lastBatchSendError = lastBatchSendError } @@ -64,9 +68,7 @@ func (b *Batcher) Run(ctx context.Context) error { case <-ticker.C: } - err := b.batchLoop(ctx) - - b.loopReport(time.Now(), err) + b.loopReport(time.Now(), b.batchLoop(ctx)) } } @@ -74,10 +76,10 @@ func (b *Batcher) batchLoop(ctx context.Context) error { b.mtx.Lock() defer b.mtx.Unlock() - _, err := b.writeClient.WriteRaw(ctx, - &profilestorepb.WriteRawRequest{Series: b.series}) - - if err != nil { + if _, err := b.writeClient.WriteRaw( + ctx, + &profilestorepb.WriteRawRequest{Series: b.series}, + ); err != nil { level.Error(b.logger).Log("msg", "Writeclient failed to send profiles", "err", err) return err } @@ -88,51 +90,43 @@ func (b *Batcher) batchLoop(ctx context.Context) error { } func isEqualLabel(a *profilestorepb.LabelSet, b *profilestorepb.LabelSet) bool { - ret := true + if len(a.Labels) != len(b.Labels) { + return false + } - if len(a.Labels) == len(b.Labels) { - for i := range a.Labels { - if (a.Labels[i].Name != b.Labels[i].Name) || (a.Labels[i].Value != b.Labels[i].Value) { - ret = false - } + ret := true + for i := range a.Labels { + if (a.Labels[i].Name != b.Labels[i].Name) || (a.Labels[i].Value != b.Labels[i].Value) { + ret = false } - } else { - ret = false } - return ret } -func ifExists(arr []*profilestorepb.RawProfileSeries, p *profilestorepb.RawProfileSeries) (bool, int) { - res := false - +func findIndex(arr []*profilestorepb.RawProfileSeries, p *profilestorepb.RawProfileSeries) (int, bool) { for i, val := range arr { if isEqualLabel(val.Labels, p.Labels) { - return true, i + return i, true } } - return res, -1 + return -1, false } func (b *Batcher) WriteRaw(ctx context.Context, r *profilestorepb.WriteRawRequest, opts ...grpc.CallOption) (*profilestorepb.WriteRawResponse, error) { - b.mtx.Lock() defer b.mtx.Unlock() for _, profileSeries := range r.Series { - ok, j := ifExists(b.series, profileSeries) - - if ok { + if j, ok := findIndex(b.series, profileSeries); ok { b.series[j].Samples = append(b.series[j].Samples, profileSeries.Samples...) - } else { - b.series = append(b.series, &profilestorepb.RawProfileSeries{ - Labels: profileSeries.Labels, - Samples: profileSeries.Samples, - }) + continue } + b.series = append(b.series, &profilestorepb.RawProfileSeries{ + Labels: profileSeries.Labels, + Samples: profileSeries.Samples, + }) } return &profilestorepb.WriteRawResponse{}, nil - } diff --git a/pkg/agent/batcher_test.go b/pkg/agent/batcher_test.go index ea2f9a42a..3b57974ed 100644 --- a/pkg/agent/batcher_test.go +++ b/pkg/agent/batcher_test.go @@ -18,43 +18,42 @@ import ( "context" "testing" + "github.com/go-kit/log" profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" "github.com/stretchr/testify/require" ) func isEqualSample(a []*profilestorepb.RawSample, b []*profilestorepb.RawSample) bool { - ret := true + if len(a) != len(b) { + return false + } - if len(a) == len(b) { - for i := range a { - if !bytes.Equal(a[i].RawProfile, b[i].RawProfile) { - ret = false - } + ret := true + for i := range a { + if !bytes.Equal(a[i].RawProfile, b[i].RawProfile) { + ret = false } - } else { - ret = false } - return ret } func compareProfileSeries(a []*profilestorepb.RawProfileSeries, b []*profilestorepb.RawProfileSeries) bool { + if len(a) != len(b) { + return false + } + ret := true - if len(a) == len(b) { - for i := range a { - if !isEqualLabel(a[i].Labels, b[i].Labels) || !isEqualSample(a[i].Samples, b[i].Samples) { - ret = false - } + for i := range a { + if !isEqualLabel(a[i].Labels, b[i].Labels) || !isEqualSample(a[i].Samples, b[i].Samples) { + ret = false } - } else { - ret = false } return ret } func TestWriteClient(t *testing.T) { wc := NewNoopProfileStoreClient() - batcher := NewBatcher(wc) + batcher := NewBatchWriteClient(log.NewNopLogger(), wc) labelset1 := profilestorepb.LabelSet{ Labels: []*profilestorepb.Label{{