Skip to content

Commit

Permalink
Merge pull request opensearch-project#247 from parca-dev/backoff
Browse files Browse the repository at this point in the history
Add exponential backoff to profile writes and debuginfo upload
  • Loading branch information
kakkoyun authored Feb 23, 2022
2 parents 5128f50 + 77c25dd commit 9fbcc6b
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 43 deletions.
9 changes: 5 additions & 4 deletions cmd/debug-info/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func main() {
logger := logger.NewLogger(flags.LogLevel, logger.LogFormatLogfmt, "")

var (
g run.Group
dc debuginfo.Client = debuginfo.NewNoopClient()
g run.Group
debugInfoClient = debuginfo.NewNoopClient()
)

if len(flags.Upload.StoreAddress) > 0 {
Expand All @@ -82,10 +82,10 @@ func main() {
}
defer conn.Close()

dc = parcadebuginfo.NewDebugInfoClient(conn)
debugInfoClient = parcadebuginfo.NewDebugInfoClient(conn)
}

die := debuginfo.NewExtractor(logger, dc, flags.TempDir)
die := debuginfo.NewExtractor(logger, debugInfoClient, flags.TempDir)

ctx, cancel := context.WithCancel(context.Background())
switch kongCtx.Command() {
Expand Down Expand Up @@ -171,6 +171,7 @@ func main() {
g.Add(run.SignalHandler(ctx, os.Interrupt, os.Kill))
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
level.Info(logger).Log("msg", "done!")
}
Expand Down
23 changes: 12 additions & 11 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func main() {
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

var wc profilestorepb.ProfileStoreServiceClient = agent.NewNoopProfileStoreClient()
var dc debuginfo.Client = debuginfo.NewNoopClient()
profileStoreClient := agent.NewNoopProfileStoreClient()
debugInfoClient := debuginfo.NewNoopClient()

if len(flags.StoreAddress) > 0 {
conn, err := grpcConn(reg, flags)
Expand All @@ -130,16 +130,17 @@ func main() {
}

// Initialize actual clients with the connection.
wc = profilestorepb.NewProfileStoreServiceClient(conn)
dc = parcadebuginfo.NewDebugInfoClient(conn)
profileStoreClient = profilestorepb.NewProfileStoreServiceClient(conn)
debugInfoClient = parcadebuginfo.NewDebugInfoClient(conn)
}

ksymCache := ksym.NewKsymCache(logger)

var (
configs discovery.Configs
bwc = agent.NewBatchWriteClient(logger, wc)
listener = agent.NewProfileListener(logger, bwc)
configs discovery.Configs
// TODO(Sylfrena): Make ticker duration configurable
batchWriteClient = agent.NewBatchWriteClient(logger, profileStoreClient, 10*time.Second)
profileListener = agent.NewProfileListener(logger, batchWriteClient)
)

if flags.Kubernetes {
Expand All @@ -158,7 +159,7 @@ func main() {
}

externalLabels := getExternalLabels(flags.ExternalLabel, flags.Node)
tm := target.NewManager(logger, reg, externalLabels, ksymCache, listener, dc, flags.ProfilingDuration, flags.TempDir)
tm := target.NewManager(logger, reg, externalLabels, ksymCache, profileListener, debugInfoClient, flags.ProfilingDuration, flags.TempDir)

mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down Expand Up @@ -251,7 +252,7 @@ func main() {
ctx, cancel := context.WithTimeout(ctx, time.Second*11)
defer cancel()

profile, err := listener.NextMatchingProfile(ctx, matchers)
profile, err := profileListener.NextMatchingProfile(ctx, matchers)
if profile == nil || err == context.Canceled {
http.Error(w, "No profile taken in the last 11 seconds that matches the requested label-matchers query. Profiles are taken every 10 seconds so either the profiler matching the label-set has stopped profiling, or the label-set was incorrect.", http.StatusNotFound)
return
Expand Down Expand Up @@ -322,7 +323,7 @@ func main() {
ctx, cancel := context.WithCancel(ctx)
g.Add(func() error {
level.Debug(logger).Log("msg", "starting batch write client")
return bwc.Run(ctx)
return batchWriteClient.Run(ctx)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -424,7 +425,7 @@ func grpcConn(reg prometheus.Registerer, flags flags) (*grpc.ClientConn, error)
return nil, fmt.Errorf("failed to read bearer token from file: %w", err)
}
opts = append(opts, grpc.WithPerRPCCredentials(&perRequestBearerToken{
token: string(b),
token: strings.TrimSpace(string(b)),
insecure: flags.Insecure,
}))
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/alecthomas/kong v0.4.1
github.com/aquasecurity/libbpfgo v0.2.4-libbpf-0.6.1
github.com/cenkalti/backoff/v4 v4.1.2
github.com/cespare/xxhash/v2 v2.1.2
github.com/containerd/cgroups v1.0.3
github.com/containerd/containerd v1.6.0
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible // indirect
github.com/baidubce/bce-sdk-go v0.9.81 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/containerd/go-runc v1.0.0 // indirect
Expand Down
45 changes: 30 additions & 15 deletions pkg/agent/write_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"google.golang.org/grpc"
)

type Batcher struct {
logger log.Logger
writeClient profilestorepb.ProfileStoreServiceClient
logger log.Logger
writeClient profilestorepb.ProfileStoreServiceClient
writeInterval time.Duration

mtx *sync.RWMutex
series []*profilestorepb.RawProfileSeries
Expand All @@ -35,10 +37,11 @@ type Batcher struct {
lastBatchSendError error
}

func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient) *Batcher {
func NewBatchWriteClient(logger log.Logger, wc profilestorepb.ProfileStoreServiceClient, writeInterval time.Duration) *Batcher {
return &Batcher{
logger: logger,
writeClient: wc,
logger: logger,
writeClient: wc,
writeInterval: writeInterval,

series: []*profilestorepb.RawProfileSeries{},
mtx: &sync.RWMutex{},
Expand All @@ -54,10 +57,7 @@ func (b *Batcher) loopReport(lastBatchSentAt time.Time, lastBatchSendError error
}

func (b *Batcher) Run(ctx context.Context) error {
// TODO(Sylfrena): Make ticker duration configurable
const tickerDuration = 10 * time.Second

ticker := time.NewTicker(tickerDuration)
ticker := time.NewTicker(b.writeInterval)
defer ticker.Stop()

for {
Expand All @@ -77,15 +77,30 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
b.series = []*profilestorepb.RawProfileSeries{}
b.mtx.Unlock()

if _, err := b.writeClient.WriteRaw(
ctx,
&profilestorepb.WriteRawRequest{Series: batch},
); err != nil {
level.Error(b.logger).Log("msg", "Write client failed to send profiles", "err", err)
expbackOff := backoff.NewExponentialBackOff()
expbackOff.MaxElapsedTime = b.writeInterval // TODO: Subtract ~10% of interval to account for overhead in loop
expbackOff.InitialInterval = 500 * time.Millisecond // Let's not retry to aggressively to start with.

err := backoff.Retry(func() error {
_, err := b.writeClient.WriteRaw(ctx, &profilestorepb.WriteRawRequest{Series: batch})
// Only log error if retrying, otherwise it will be logged outside the retry
if err != nil && expbackOff.NextBackOff().Nanoseconds() > 0 {
level.Debug(b.logger).Log(
"msg", "batch write client failed to send profiles",
"retry", expbackOff.NextBackOff(),
"count", len(batch),
"err", err,
)
}
return err
}, expbackOff)
if err != nil {
// TODO: Add metric and increase with every backoff iteration.
level.Error(b.logger).Log("msg", "batch write client failed to send profiles", "count", len(batch), "err", err)
return err
}

level.Debug(b.logger).Log("msg", "Write client has sent profiles", "count", len(batch))
level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch))
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/write_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"testing"
"time"

"github.com/go-kit/log"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
Expand Down Expand Up @@ -53,7 +54,7 @@ func compareProfileSeries(a, b []*profilestorepb.RawProfileSeries) bool {

func TestWriteClient(t *testing.T) {
wc := NewNoopProfileStoreClient()
batcher := NewBatchWriteClient(log.NewNopLogger(), wc)
batcher := NewBatchWriteClient(log.NewNopLogger(), wc, time.Second)

labelset1 := profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{{
Expand All @@ -70,7 +71,7 @@ func TestWriteClient(t *testing.T) {

ctx := context.Background()

samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 0o4, 96}}}
samples1 := []*profilestorepb.RawSample{{RawProfile: []byte{11, 4, 96}}}
samples2 := []*profilestorepb.RawSample{{RawProfile: []byte{15, 11, 95}}}

t.Run("insertFirstProfile", func(t *testing.T) {
Expand Down
19 changes: 18 additions & 1 deletion pkg/debuginfo/debuginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/containerd/containerd/sys/reaper"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -359,7 +361,22 @@ func (di *Extractor) uploadDebugInfo(ctx context.Context, buildID, file string)
return fmt.Errorf("failed to open temp file for debug information: %w", err)
}

if _, err := di.Client.Upload(ctx, buildID, f); err != nil {
expBackOff := backoff.NewExponentialBackOff()
expBackOff.InitialInterval = time.Second
expBackOff.MaxElapsedTime = time.Minute

err = backoff.Retry(func() error {
_, err := di.Client.Upload(ctx, buildID, f)
if err != nil {
di.logger.Log(
"msg", "failed to upload debug information",
"retry", time.Second,
"err", err,
)
}
return err
}, expBackOff)
if err != nil {
return fmt.Errorf("failed to upload debug information: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/pprof/profile"
"github.com/parca-dev/parca-agent/pkg/agent"
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"golang.org/x/sys/unix"

"github.com/parca-dev/parca-agent/pkg/agent"
"github.com/parca-dev/parca-agent/pkg/byteorder"
"github.com/parca-dev/parca-agent/pkg/debuginfo"
"github.com/parca-dev/parca-agent/pkg/ksym"
Expand Down
7 changes: 3 additions & 4 deletions pkg/target/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func NewManager(
writeClient profilestorepb.ProfileStoreServiceClient,
debugInfoClient debuginfo.Client,
profilingDuration time.Duration,
tmp string) *Manager {
m := &Manager{
tmp string,
) *Manager {
return &Manager{
mtx: &sync.RWMutex{},
profilerPools: map[string]*ProfilerPool{},
logger: logger,
Expand All @@ -62,8 +63,6 @@ func NewManager(
profilingDuration: profilingDuration,
tmp: tmp,
}

return m
}

func (m *Manager) Run(ctx context.Context, update <-chan map[string][]*Group) error {
Expand Down
7 changes: 3 additions & 4 deletions pkg/target/profiler_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ func NewProfilerPool(
writeClient profilestorepb.ProfileStoreServiceClient,
debugInfoClient debuginfo.Client,
profilingDuration time.Duration,
tmp string) *ProfilerPool {
pp := &ProfilerPool{
tmp string,
) *ProfilerPool {
return &ProfilerPool{
ctx: ctx,
mtx: &sync.RWMutex{},
activeTargets: map[uint64]*Target{},
Expand All @@ -81,8 +82,6 @@ func NewProfilerPool(
profilingDuration: profilingDuration,
tmp: tmp,
}

return pp
}

func (pp *ProfilerPool) Profilers() []Profiler {
Expand Down

0 comments on commit 9fbcc6b

Please sign in to comment.