Skip to content

Commit

Permalink
Make max node limit configurable (grafana#2658)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Nov 10, 2023
1 parent eac0187 commit 3c5a959
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 28 deletions.
4 changes: 4 additions & 0 deletions cmd/pyroscope/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ Usage of ./pyroscope:
Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.
-querier.max-concurrent int
The maximum number of concurrent queries allowed. (default 4)
-querier.max-flamegraph-nodes-default int
Maximum number of flamegraph nodes by default. 0 to disable. (default 8192)
-querier.max-flamegraph-nodes-max int
Maximum number of flamegraph nodes allowed. 0 to disable.
-querier.max-query-length duration
The limit to length of queries. 0 to disable. (default 1d)
-querier.max-query-lookback duration
Expand Down
4 changes: 4 additions & 0 deletions cmd/pyroscope/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ Usage of ./pyroscope:
Run a health check on each ingester client during periodic cleanup. (default true)
-querier.health-check-timeout duration
Timeout for ingester client healthcheck RPCs. (default 5s)
-querier.max-flamegraph-nodes-default int
Maximum number of flamegraph nodes by default. 0 to disable. (default 8192)
-querier.max-flamegraph-nodes-max int
Maximum number of flamegraph nodes allowed. 0 to disable.
-querier.max-query-length duration
The limit to length of queries. 0 to disable. (default 1d)
-querier.max-query-lookback duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ limits:
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 0]

# Maximum number of flamegraph nodes by default. 0 to disable.
# CLI flag: -querier.max-flamegraph-nodes-default
[max_flamegraph_nodes_default: <int> | default = 8192]

# Maximum number of flamegraph nodes allowed. 0 to disable.
# CLI flag: -querier.max-flamegraph-nodes-max
[max_flamegraph_nodes_max: <int> | default = 0]

# The tenant's shard size, used when store-gateway sharding is enabled. Value
# of 0 disables shuffle sharding for the tenant, that is all tenant blocks are
# sharded across all store-gateway replicas.
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/pyroscope/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/pyroscope/pkg/util/httpgrpc"
"github.com/grafana/pyroscope/pkg/util/httpgrpcutil"
"github.com/grafana/pyroscope/pkg/validation"
)

// Config for a Frontend.
Expand Down Expand Up @@ -98,6 +99,7 @@ type Limits interface {
MaxQueryParallelism(string) int
MaxQueryLength(tenantID string) time.Duration
MaxQueryLookback(tenantID string) time.Duration
validation.FlameGraphLimits
}

type frontendRequest struct {
Expand Down
19 changes: 17 additions & 2 deletions pkg/frontend/frontend_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"

"github.com/bufbuild/connect-go"
"github.com/grafana/dskit/tenant"
"golang.org/x/sync/errgroup"

querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/math"
"github.com/grafana/pyroscope/pkg/validation"
)

func (f *Frontend) Diff(ctx context.Context,
Expand All @@ -19,6 +20,21 @@ func (f *Frontend) Diff(ctx context.Context,
) {
ctx = connectgrpc.WithProcedure(ctx, querierv1connect.QuerierServiceDiffProcedure)
g, ctx := errgroup.WithContext(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

maxNodes := c.Msg.Left.GetMaxNodes()
if n := c.Msg.Right.GetMaxNodes(); n > maxNodes {
maxNodes = n
}
maxNodes, err = validation.ValidateMaxNodes(f.limits, tenantIDs, maxNodes)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
c.Msg.Left.MaxNodes = &maxNodes
c.Msg.Right.MaxNodes = &maxNodes

var left, right *phlaremodel.Tree
g.Go(func() error {
Expand All @@ -45,7 +61,6 @@ func (f *Frontend) Diff(ctx context.Context,
return nil, err
}

maxNodes := int(math.Max(c.Msg.Left.GetMaxNodes(), c.Msg.Right.GetMaxNodes()))
diff, err := phlaremodel.NewFlamegraphDiff(left, right, maxNodes)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
Expand Down
9 changes: 6 additions & 3 deletions pkg/frontend/frontend_select_merge_span_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (f *Frontend) SelectMergeSpanProfile(ctx context.Context,
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectMergeSpanProfileResponse{Flamegraph: &querierv1.FlameGraph{}}), nil
}
maxNodes, err := validation.ValidateMaxNodes(f.limits, tenantIDs, c.Msg.GetMaxNodes())
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

g, ctx := errgroup.WithContext(ctx)
if maxConcurrent := validationutil.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
Expand All @@ -59,9 +63,8 @@ func (f *Frontend) SelectMergeSpanProfile(ctx context.Context,
LabelSelector: c.Msg.LabelSelector,
Start: r.Start.UnixMilli(),
End: r.End.UnixMilli(),
MaxNodes: c.Msg.MaxNodes,
// TODO: Make sure we don't need to copy it.
SpanSelector: c.Msg.SpanSelector,
MaxNodes: &maxNodes,
SpanSelector: c.Msg.SpanSelector,
})
resp, err := connectgrpc.RoundTripUnary[
querierv1.SelectMergeSpanProfileRequest,
Expand Down
6 changes: 5 additions & 1 deletion pkg/frontend/frontend_select_merge_stacktraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{}), nil
}
maxNodes, err := validation.ValidateMaxNodes(f.limits, tenantIDs, c.Msg.GetMaxNodes())
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

g, ctx := errgroup.WithContext(ctx)
if maxConcurrent := validationutil.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
Expand All @@ -59,7 +63,7 @@ func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
LabelSelector: c.Msg.LabelSelector,
Start: r.Start.UnixMilli(),
End: r.End.UnixMilli(),
MaxNodes: c.Msg.MaxNodes,
MaxNodes: &maxNodes,
})
resp, err := connectgrpc.RoundTripUnary[
querierv1.SelectMergeStacktracesRequest,
Expand Down
6 changes: 2 additions & 4 deletions pkg/model/flamegraph_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
)

const MaxNodes = 8192

// NewFlamegraphDiff generates a FlameGraphDiff from 2 trees.
// It also prunes the final tree based on the maxNodes parameter
// Notice that the resulting FlameGraph can't be used interchangeably with a 'single' Flamegraph
Expand All @@ -25,7 +23,7 @@ const MaxNodes = 8192
// i+4 = total , right tree
// i+5 = self , right tree
// i+6 = index in the names array
func NewFlamegraphDiff(left, right *Tree, maxNodes int) (*querierv1.FlameGraphDiff, error) {
func NewFlamegraphDiff(left, right *Tree, maxNodes int64) (*querierv1.FlameGraphDiff, error) {
// The algorithm doesn't work properly with negative nodes
// Although it's possible to silently drop these nodes
// Let's fail early and analyze properly with real data when the issue happens
Expand All @@ -52,7 +50,7 @@ func NewFlamegraphDiff(left, right *Tree, maxNodes int) (*querierv1.FlameGraphDi
levels := []int{0}
var minVal int64
if maxNodes > 0 {
minVal = int64(combineMinValues(leftTree, rightTree, maxNodes))
minVal = int64(combineMinValues(leftTree, rightTree, int(maxNodes)))
}
nameLocationCache := map[string]int{}

Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ type Querier struct {
storeGatewayQuerier *StoreGatewayQuerier
}

// TODO(kolesnikovae): For backwards compatibility.
// Should be removed in the next release.
//
// The default value should never be used in practice:
// querier frontend sets the limit.
const maxNodesDefault = int64(2048)

func New(cfg Config, ingestersRing ring.ReadRing, factory ring_client.PoolFactory, storeGatewayQuerier *StoreGatewayQuerier, reg prometheus.Registerer, logger log.Logger, clientsOptions ...connect.ClientOption) (*Querier, error) {
Expand Down Expand Up @@ -472,6 +477,7 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser
}), nil
}

// FIXME(kolesnikovae): The method is never used and should be removed.
func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffRequest]) (*connect.Response[querierv1.DiffResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Diff")
defer func() {
Expand Down Expand Up @@ -511,7 +517,7 @@ func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffR
return nil, err
}

fd, err := phlaremodel.NewFlamegraphDiff(leftTree, rightTree, phlaremodel.MaxNodes)
fd, err := phlaremodel.NewFlamegraphDiff(leftTree, rightTree, maxNodesDefault)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/validation/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func (oe *OverridesExporter) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(oe.defaultsDescription, prometheus.GaugeValue, float64(oe.defaultLimits.MaxQueryLength), "max_query_length")
ch <- prometheus.MustNewConstMetric(oe.defaultsDescription, prometheus.GaugeValue, float64(oe.defaultLimits.MaxQueryParallelism), "max_query_parallelism")
ch <- prometheus.MustNewConstMetric(oe.defaultsDescription, prometheus.GaugeValue, float64(oe.defaultLimits.QuerySplitDuration), "split_queries_by_interval")
ch <- prometheus.MustNewConstMetric(oe.defaultsDescription, prometheus.GaugeValue, float64(oe.defaultLimits.MaxFlameGraphNodesDefault), "max_flamegraph_nodes_default")
ch <- prometheus.MustNewConstMetric(oe.defaultsDescription, prometheus.GaugeValue, float64(oe.defaultLimits.MaxFlameGraphNodesMax), "max_flamegraph_nodes_max")

// Do not export per-tenant limits if they've not been configured at all.
if oe.tenantLimits == nil {
Expand All @@ -139,6 +141,8 @@ func (oe *OverridesExporter) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(oe.overrideDescription, prometheus.GaugeValue, float64(limits.MaxQueryLength), "max_query_length", tenant)
ch <- prometheus.MustNewConstMetric(oe.overrideDescription, prometheus.GaugeValue, float64(limits.MaxQueryParallelism), "max_query_parallelism", tenant)
ch <- prometheus.MustNewConstMetric(oe.overrideDescription, prometheus.GaugeValue, float64(limits.QuerySplitDuration), "split_queries_by_interval", tenant)
ch <- prometheus.MustNewConstMetric(oe.overrideDescription, prometheus.GaugeValue, float64(limits.MaxFlameGraphNodesDefault), "max_flamegraph_nodes_default", tenant)
ch <- prometheus.MustNewConstMetric(oe.overrideDescription, prometheus.GaugeValue, float64(limits.MaxFlameGraphNodesMax), "max_flamegraph_nodes_max", tenant)
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/validation/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func TestOverridesExporter_withConfig(t *testing.T) {
MaxSessionsPerSeries: 21,
DistributorAggregationWindow: 22,
DistributorAggregationPeriod: 23,
MaxFlameGraphNodesDefault: 24,
MaxFlameGraphNodesMax: 25,
},
}
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
Expand Down Expand Up @@ -76,6 +78,8 @@ func TestOverridesExporter_withConfig(t *testing.T) {
MaxSessionsPerSeries: 31,
DistributorAggregationWindow: 32,
DistributorAggregationPeriod: 33,
MaxFlameGraphNodesDefault: 34,
MaxFlameGraphNodesMax: 35,
}, validation.NewMockTenantLimits(tenantLimits), log.NewNopLogger(), nil)
require.NoError(t, err)

Expand Down Expand Up @@ -121,6 +125,8 @@ pyroscope_limits_overrides{limit_name="split_queries_by_interval",tenant="tenant
pyroscope_limits_overrides{limit_name="max_sessions_per_series",tenant="tenant-a"} 21
pyroscope_limits_overrides{limit_name="distributor_aggregation_window",tenant="tenant-a"} 22
pyroscope_limits_overrides{limit_name="distributor_aggregation_period",tenant="tenant-a"} 23
pyroscope_limits_overrides{limit_name="max_flamegraph_nodes_default",tenant="tenant-a"} 24
pyroscope_limits_overrides{limit_name="max_flamegraph_nodes_max",tenant="tenant-a"} 25
`

// Make sure each override matches the values from the supplied `Limit`
Expand All @@ -144,6 +150,8 @@ pyroscope_limits_defaults{limit_name="split_queries_by_interval"} 30
pyroscope_limits_defaults{limit_name="max_sessions_per_series"} 31
pyroscope_limits_defaults{limit_name="distributor_aggregation_window"} 32
pyroscope_limits_defaults{limit_name="distributor_aggregation_period"} 33
pyroscope_limits_defaults{limit_name="max_flamegraph_nodes_default"} 34
pyroscope_limits_defaults{limit_name="max_flamegraph_nodes_max"} 35
`
err = testutil.CollectAndCompare(exporter, bytes.NewBufferString(limitsMetrics), "pyroscope_limits_defaults")
assert.NoError(t, err)
Expand Down
17 changes: 17 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type Limits struct {
MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`

// FlameGraph enforced limits.
MaxFlameGraphNodesDefault int `yaml:"max_flamegraph_nodes_default" json:"max_flamegraph_nodes_default"`
MaxFlameGraphNodesMax int `yaml:"max_flamegraph_nodes_max" json:"max_flamegraph_nodes_max"`

// Store-gateway.
StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`

Expand Down Expand Up @@ -124,6 +128,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxProfileStacktraceDepth, "validation.max-profile-stacktrace-depth", 1000, "Maximum depth of a profile stacktrace. Profiles are not rejected instead stacktraces are truncated. 0 to disable.")
f.IntVar(&l.MaxProfileSymbolValueLength, "validation.max-profile-symbol-value-length", 65535, "Maximum length of a profile symbol value (labels, function names and filenames, etc...). Profiles are not rejected instead symbol values are truncated. 0 to disable.")

f.IntVar(&l.MaxFlameGraphNodesDefault, "querier.max-flamegraph-nodes-default", 8<<10, "Maximum number of flamegraph nodes by default. 0 to disable.")
f.IntVar(&l.MaxFlameGraphNodesMax, "querier.max-flamegraph-nodes-max", 0, "Maximum number of flamegraph nodes allowed. 0 to disable.")

f.Var(&l.DistributorAggregationWindow, "distributor.aggregation-window", "Duration of the distributor aggregation window. Requires aggregation period to be specified. 0 to disable.")
f.Var(&l.DistributorAggregationPeriod, "distributor.aggregation-period", "Duration of the distributor aggregation period. Requires aggregation window to be specified. 0 to disable.")

Expand Down Expand Up @@ -306,6 +313,16 @@ func (o *Overrides) MaxQueryLookback(tenantID string) time.Duration {
return time.Duration(o.getOverridesForTenant(tenantID).MaxQueryLookback)
}

// MaxFlameGraphNodesDefault returns the max flamegraph nodes used by default.
func (o *Overrides) MaxFlameGraphNodesDefault(tenantID string) int {
return o.getOverridesForTenant(tenantID).MaxFlameGraphNodesDefault
}

// MaxFlameGraphNodesMax returns the max flamegraph nodes allowed.
func (o *Overrides) MaxFlameGraphNodesMax(tenantID string) int {
return o.getOverridesForTenant(tenantID).MaxFlameGraphNodesMax
}

// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
func (o *Overrides) StoreGatewayTenantShardSize(userID string) int {
return o.getOverridesForTenant(userID).StoreGatewayTenantShardSize
Expand Down
15 changes: 11 additions & 4 deletions pkg/validation/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type MockLimits struct {
MaxLabelValueLengthValue int
MaxLabelNamesPerSeriesValue int

MaxFlameGraphNodesDefaultValue int
MaxFlameGraphNodesMaxValue int

DistributorAggregationWindowValue time.Duration
DistributorAggregationPeriodValue time.Duration

Expand All @@ -30,10 +33,14 @@ func (m MockLimits) QuerySplitDuration(string) time.Duration { return m.Q
func (m MockLimits) MaxQueryParallelism(string) int { return m.MaxQueryParallelismValue }
func (m MockLimits) MaxQueryLength(tenantID string) time.Duration { return m.MaxQueryLengthValue }
func (m MockLimits) MaxQueryLookback(tenantID string) time.Duration { return m.MaxQueryLookbackValue }
func (m MockLimits) MaxLabelNameLength(userID string) int { return m.MaxLabelNameLengthValue }
func (m MockLimits) MaxLabelValueLength(userID string) int { return m.MaxLabelValueLengthValue }
func (m MockLimits) MaxLabelNamesPerSeries(userID string) int { return m.MaxLabelNamesPerSeriesValue }
func (m MockLimits) MaxProfileSizeBytes(userID string) int { return m.MaxProfileSizeBytesValue }

func (m MockLimits) MaxFlameGraphNodesDefault(string) int { return m.MaxFlameGraphNodesDefaultValue }
func (m MockLimits) MaxFlameGraphNodesMax(string) int { return m.MaxFlameGraphNodesMaxValue }

func (m MockLimits) MaxLabelNameLength(userID string) int { return m.MaxLabelNameLengthValue }
func (m MockLimits) MaxLabelValueLength(userID string) int { return m.MaxLabelValueLengthValue }
func (m MockLimits) MaxLabelNamesPerSeries(userID string) int { return m.MaxLabelNamesPerSeriesValue }
func (m MockLimits) MaxProfileSizeBytes(userID string) int { return m.MaxProfileSizeBytesValue }
func (m MockLimits) MaxProfileStacktraceSamples(userID string) int {
return m.MaxProfileStacktraceSamplesValue
}
Expand Down
Loading

0 comments on commit 3c5a959

Please sign in to comment.