-
Notifications
You must be signed in to change notification settings - Fork 206
Add peer-to-peer metrics tracker #1333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f598bf8
4402908
fa22f6a
80ed359
2fba68d
5bababb
753a5d5
8d31aa5
51fd9e5
e8db442
ec9c0d2
c37153c
94882c5
fb54ef7
21cc7a8
d5ab234
f7d02fc
196a06c
39a2b94
835ee09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package server | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "go.uber.org/zap" | ||
| "golang.org/x/sync/semaphore" | ||
|
|
||
| "github.com/e2b-dev/infra/packages/orchestrator/internal/sharedstate" | ||
| featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" | ||
| "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" | ||
| ) | ||
|
|
||
| type Limiter struct { | ||
| maxStartingSandboxes int64 | ||
|
|
||
| featureFlags *featureflags.Client | ||
| startingSandboxes *semaphore.Weighted | ||
| sharedStateManager *sharedstate.Manager | ||
| } | ||
|
|
||
| func NewLimiter( | ||
| maxStartingSandboxes int64, | ||
| featureFlags *featureflags.Client, | ||
| sharedStateManager *sharedstate.Manager, | ||
| ) *Limiter { | ||
| return &Limiter{ | ||
| featureFlags: featureFlags, | ||
| sharedStateManager: sharedStateManager, | ||
| maxStartingSandboxes: maxStartingSandboxes, | ||
| startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes), | ||
| } | ||
| } | ||
|
|
||
| type TooManySandboxesRunningError struct { | ||
| Current, Max int | ||
| } | ||
|
|
||
| func (t TooManySandboxesRunningError) Error() string { | ||
| return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max) | ||
| } | ||
|
|
||
| var _ error = TooManySandboxesRunningError{} | ||
|
|
||
| type TooManySandboxesStartingError struct { | ||
| Max int64 | ||
| } | ||
|
|
||
| var _ error = TooManySandboxesStartingError{} | ||
|
|
||
| func (t TooManySandboxesStartingError) Error() string { | ||
| return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max) | ||
| } | ||
|
|
||
| func (t *Limiter) AcquireStarting(ctx context.Context) error { | ||
| maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode) | ||
| if err != nil { | ||
| zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) | ||
| } | ||
|
|
||
| runningSandboxes := t.sharedStateManager.TotalRunningCount() | ||
| if runningSandboxes >= maxRunningSandboxesPerNode { | ||
| telemetry.ReportEvent(ctx, "max number of running sandboxes reached") | ||
|
|
||
| return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode} | ||
| } | ||
|
|
||
| // Check if we've reached the max number of starting instances on this node | ||
| acquired := t.startingSandboxes.TryAcquire(1) | ||
| if !acquired { | ||
| telemetry.ReportEvent(ctx, "too many starting sandboxes on node") | ||
|
|
||
| return TooManySandboxesStartingError{t.maxStartingSandboxes} | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (t *Limiter) ReleaseStarting() { | ||
| defer t.startingSandboxes.Release(1) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,10 +30,7 @@ import ( | |
|
|
||
| var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/internal/server") | ||
|
|
||
| const ( | ||
| requestTimeout = 60 * time.Second | ||
| maxStartingInstancesPerNode = 3 | ||
| ) | ||
| const requestTimeout = 60 * time.Second | ||
|
|
||
| func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequest) (*orchestrator.SandboxCreateResponse, error) { | ||
| // set max request timeout for this request | ||
|
|
@@ -66,26 +63,26 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ | |
| Build(), | ||
| ) | ||
|
|
||
| maxRunningSandboxesPerNode, err := s.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode) | ||
| if err != nil { | ||
| zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err)) | ||
| } | ||
|
|
||
| runningSandboxes := s.sandboxes.Count() | ||
| if runningSandboxes >= maxRunningSandboxesPerNode { | ||
| telemetry.ReportEvent(ctx, "max number of running sandboxes reached") | ||
|
|
||
| return nil, status.Errorf(codes.ResourceExhausted, "max number of running sandboxes on node reached (%d), please retry", maxRunningSandboxesPerNode) | ||
| } | ||
|
|
||
| // Check if we've reached the max number of starting instances on this node | ||
| acquired := s.startingSandboxes.TryAcquire(1) | ||
| if !acquired { | ||
| telemetry.ReportEvent(ctx, "too many starting sandboxes on node") | ||
|
|
||
| return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry") | ||
| if err := s.sandboxLimiter.AcquireStarting(ctx); err != nil { | ||
| var tooManyRunning TooManySandboxesRunningError | ||
| var tooManyStarting TooManySandboxesStartingError | ||
| switch { | ||
| case errors.As(err, &tooManyRunning): | ||
| telemetry.ReportEvent(ctx, "max number of running sandboxes reached") | ||
|
|
||
| return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes on node reached (%d>=%d), please retry", tooManyRunning.Current, tooManyRunning.Max) | ||
| case errors.As(err, &tooManyStarting): | ||
| telemetry.ReportEvent(ctx, "too many starting sandboxes on node") | ||
|
|
||
| return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry") | ||
| default: | ||
| return nil, fmt.Errorf("unexpected error while acquiring starting lock: %w", err) | ||
| } | ||
| } | ||
| defer s.startingSandboxes.Release(1) | ||
| defer func() { | ||
| s.sandboxLimiter.ReleaseStarting() | ||
| }() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Incomplete Error Handling Causes Semaphore MismatchThe |
||
|
|
||
| template, err := s.templateCache.GetTemplate( | ||
| ctx, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Error Type Mismatch in Sandbox Acquisition
The
AcquireStartingmethod returns a simpleErrTooManyStartingerror when the starting sandbox limit is reached. The calling code, however, expects aTooManySandboxesStartingErrortype forerrors.As()checks, which includesCurrentandMaxfields. This mismatch prevents proper error handling and loses specific context.