Skip to content

Commit

Permalink
pkg/agent: Add batching support for profile write requests
Browse files Browse the repository at this point in the history
Fixes opensearch-project#14

Signed-off-by: Sumera Priyadarsini <sylphrenadin@gmail.com>
  • Loading branch information
Sylfrena committed Nov 2, 2021
1 parent 451109e commit 531c034
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 24 deletions.
15 changes: 13 additions & 2 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func main() {
pm *agent.PodManager
sm *agent.SystemdManager
targetSources = []agent.TargetSource{}
batcher = *agent.NewBatcher(wc)
)

if flags.Kubernetes {
Expand All @@ -134,7 +135,7 @@ func main() {
flags.PodLabelSelector,
flags.SamplingRatio,
ksymCache,
wc,
&batcher,
dc,
flags.TempDir,
flags.SocketPath,
Expand All @@ -155,7 +156,7 @@ func main() {
flags.SamplingRatio,
flags.ExternalLabel,
ksymCache,
wc,
&batcher,
dc,
flags.TempDir,
flags.ProfilingDuration,
Expand Down Expand Up @@ -282,6 +283,16 @@ func main() {
http.NotFound(w, r)
})

{
ctx, cancel := context.WithCancel(ctx)
g.Add(func() error {
level.Debug(logger).Log("msg", "starting batch write client")
return batcher.Run(ctx)
}, func(error) {
cancel()
})
}

if len(flags.SystemdUnits) > 0 {
ctx, cancel := context.WithCancel(ctx)
g.Add(func() error {
Expand Down
123 changes: 123 additions & 0 deletions pkg/agent/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package agent

import (
"context"

"sync"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
)

type Batcher struct {
series map[uint64]*profilestorepb.RawProfileSeries
writeClient profilestorepb.ProfileStoreServiceClient
logger log.Logger

mtx sync.RWMutex
lastProfileTakenAt time.Time
lastError error
}

func NewBatcher(wc profilestorepb.ProfileStoreServiceClient) *Batcher {
return &Batcher{
series: make(map[uint64]*profilestorepb.RawProfileSeries),
writeClient: wc,
}
}

func (b *Batcher) loopReport(lastProfileTakenAt time.Time, lastError error) {
b.mtx.Lock()
defer b.mtx.Unlock()
b.lastProfileTakenAt = lastProfileTakenAt
b.lastError = lastError
}

func (b *Batcher) Run(ctx context.Context) error {
// TODO(Sylfrena): Make ticker duration configurable
const tickerDuration = 10000000000

ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()

var err error
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

err := b.batchLoop(ctx)
b.loopReport(time.Now(), err)
}
b.series = make(map[uint64]*profilestorepb.RawProfileSeries)
return err
}

func (batcher *Batcher) batchLoop(ctx context.Context) error {

batcher.mtx.Lock()
defer batcher.mtx.Unlock()

var profileSeries []*profilestorepb.RawProfileSeries

for _, value := range batcher.series {
profileSeries = append(profileSeries, &profilestorepb.RawProfileSeries{
Labels: value.Labels,
Samples: value.Samples,
})

}

_, err := batcher.writeClient.WriteRaw(ctx,
&profilestorepb.WriteRawRequest{Series: profileSeries})

if err != nil {
level.Error(batcher.logger).Log("msg", "Writeclient failed to send profiles", "err", err)
return err
}

return nil
}

func (batcher *Batcher) Scheduler(profileSeries profilestorepb.RawProfileSeries) {
batcher.mtx.Lock()
defer batcher.mtx.Unlock()

labelsetHash := Hash(*profileSeries.Labels)

existing_sample, ok := batcher.series[labelsetHash]
if ok {
batcher.series[labelsetHash].Samples = append(existing_sample.Samples, profileSeries.Samples...)
} else {
batcher.series[labelsetHash] = &profilestorepb.RawProfileSeries{}
batcher.series[labelsetHash].Samples = profileSeries.Samples
}
}

func Hash(ls profilestorepb.LabelSet) uint64 {
var seps = []byte{'\xff'}
b := make([]byte, 0, 1024)
for _, v := range ls.Labels {
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
// If labels entry is 1KB+ do not allocate whole entry.
h := xxhash.New()
_, _ = h.Write(b)
_, _ = h.WriteString(v.Name)
_, _ = h.Write(seps)
_, _ = h.WriteString(v.Value)
_, _ = h.Write(seps)
return h.Sum64()
}

b = append(b, v.Name...)
b = append(b, seps[0])
b = append(b, v.Value...)
b = append(b, seps[0])
}
return xxhash.Sum64(b)
}
129 changes: 129 additions & 0 deletions pkg/agent/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2021 The Parca Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agent

import (
"testing"

profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/stretchr/testify/require"
)

func TestScheduler(t *testing.T) {
wc := NewNoopProfileStoreClient()
batcher := NewBatcher(wc)

labelset1 := profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{{
Name: "n1",
Value: "v1",
}},
}
labelset2 := profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{{
Name: "n2",
Value: "v2",
}},
}

labelsetHash1 := Hash(labelset1)
labelsetHash2 := Hash(labelset2)

samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 04, 96}}}
samples2 := []*profilestorepb.RawSample{{RawProfile: []byte{15, 11, 95}}}

t.Run("insertFirstProfile", func(t *testing.T) {

batcher.Scheduler(profilestorepb.RawProfileSeries{
Labels: &labelset1,
Samples: samples1,
})

series := map[uint64]*profilestorepb.RawProfileSeries{
labelsetHash1: &profilestorepb.RawProfileSeries{
Labels: &labelset1,
Samples: samples1,
},
}

require.Equal(t, series[labelsetHash1].Samples,
batcher.series[labelsetHash1].Samples)
})

t.Run("insertSecondProfile", func(t *testing.T) {

batcher.Scheduler(profilestorepb.RawProfileSeries{
Labels: &labelset2,
Samples: samples2,
})

series := map[uint64]*profilestorepb.RawProfileSeries{
labelsetHash1: &profilestorepb.RawProfileSeries{
Labels: &labelset1,
Samples: samples1,
},
labelsetHash2: &profilestorepb.RawProfileSeries{
Labels: &labelset2,
Samples: samples2,
},
}

require.Equal(t, series[labelsetHash1].Samples,
batcher.series[labelsetHash1].Samples)

require.Equal(t, series[labelsetHash2].Samples,
batcher.series[labelsetHash2].Samples)
})

t.Run("appendProfile", func(t *testing.T) {

batcher.Scheduler(profilestorepb.RawProfileSeries{
Labels: &labelset1,
Samples: samples2,
})

series := map[uint64]*profilestorepb.RawProfileSeries{
labelsetHash1: &profilestorepb.RawProfileSeries{
Labels: &labelset1,
Samples: append(samples1, samples2...),
},
labelsetHash2: &profilestorepb.RawProfileSeries{
Labels: &labelset2,
Samples: samples2,
},
}

require.Equal(t, series[labelsetHash1].Samples,
batcher.series[labelsetHash1].Samples)

require.Equal(t, series[labelsetHash2].Samples,
batcher.series[labelsetHash2].Samples)

})

t.Run("hash", func(t *testing.T) {

labelset := profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{{
Name: "n1",
Value: "v1",
}},
}

labelsetHash := Hash(labelset)

require.Equal(t, uint64(0xa3b730de852c2e2c), labelsetHash)
})

}
9 changes: 4 additions & 5 deletions pkg/agent/podmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -52,7 +51,7 @@ type PodManager struct {
containerIDsByKey map[string]map[string]*CgroupProfiler
mtx *sync.RWMutex

writeClient profilestorepb.ProfileStoreServiceClient
batcher *Batcher
debugInfoClient debuginfo.Client
sink func(Record)

Expand Down Expand Up @@ -104,7 +103,7 @@ func (g *PodManager) Run(ctx context.Context) error {
logger,
g.externalLabels,
g.ksymCache,
g.writeClient,
*g.batcher,
g.debugInfoClient,
container,
g.profilingDuration,
Expand Down Expand Up @@ -172,7 +171,7 @@ func NewPodManager(
podLabelSelector string,
samplingRatio float64,
ksymCache *ksym.KsymCache,
writeClient profilestorepb.ProfileStoreServiceClient,
batcher *Batcher,
debugInfoClient debuginfo.Client,
tmp string,
socketPath string,
Expand Down Expand Up @@ -202,7 +201,7 @@ func NewPodManager(
containerIDsByKey: make(map[string]map[string]*CgroupProfiler),
k8sClient: k8sClient,
mtx: &sync.RWMutex{},
writeClient: writeClient,
batcher: batcher,
debugInfoClient: debugInfoClient,
tmpDir: tmp,
profilingDuration: profilingDuration,
Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ type CgroupProfiler struct {
cancel func()

pidMappingFileCache *maps.PidMappingFileCache
writeClient profilestorepb.ProfileStoreServiceClient
debugInfoExtractor *debuginfo.Extractor
batcher *Batcher

debugInfoExtractor *debuginfo.Extractor

mtx *sync.RWMutex
lastProfileTakenAt time.Time
Expand All @@ -99,7 +100,7 @@ func NewCgroupProfiler(
logger log.Logger,
externalLabels map[string]string,
ksymCache *ksym.KsymCache,
writeClient profilestorepb.ProfileStoreServiceClient,
batcher Batcher,
debugInfoClient debuginfo.Client,
target CgroupProfilingTarget,
profilingDuration time.Duration,
Expand All @@ -116,6 +117,7 @@ func NewCgroupProfiler(
pidMappingFileCache: maps.NewPidMappingFileCache(logger),
perfCache: perf.NewPerfCache(logger),
writeClient: writeClient,
batcher: &batcher,
debugInfoExtractor: debuginfo.NewExtractor(
log.With(logger, "component", "debuginfoextractor"),
debugInfoClient,
Expand Down Expand Up @@ -480,14 +482,12 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts,
return err
}
labels := p.Labels()
_, err = p.writeClient.WriteRaw(ctx, &profilestorepb.WriteRawRequest{
Series: []*profilestorepb.RawProfileSeries{{
Labels: &profilestorepb.LabelSet{Labels: labels},
Samples: []*profilestorepb.RawSample{{
RawProfile: buf.Bytes(),
}},
}},

p.batcher.Scheduler(profilestorepb.RawProfileSeries{
Labels: &profilestorepb.LabelSet{Labels: labels},
Samples: []*profilestorepb.RawSample{{RawProfile: buf.Bytes()}},
})

if err != nil {
level.Error(p.logger).Log("msg", "failed to send profile", "err", err)
}
Expand Down
Loading

0 comments on commit 531c034

Please sign in to comment.