Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/orchestrator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/e2b-dev/infra/packages/shared v0.0.0
github.com/edsrzf/mmap-go v1.2.0
github.com/firecracker-microvm/firecracker-go-sdk v1.0.0
github.com/fsnotify/fsnotify v1.9.0
github.com/gin-contrib/size v1.0.2
github.com/gin-gonic/gin v1.10.1
github.com/go-openapi/strfmt v0.23.0
Expand Down
2 changes: 2 additions & 0 deletions packages/orchestrator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions packages/orchestrator/internal/cfg/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ type BuilderConfig struct {
type Config struct {
BuilderConfig

ClickhouseConnectionString string `env:"CLICKHOUSE_CONNECTION_STRING"`
ForceStop bool `env:"FORCE_STOP"`
GRPCPort uint16 `env:"GRPC_PORT" envDefault:"5008"`
LaunchDarklyAPIKey string `env:"LAUNCH_DARKLY_API_KEY"`
OrchestratorLockPath string `env:"ORCHESTRATOR_LOCK_PATH" envDefault:"/orchestrator.lock"`
ProxyPort uint16 `env:"PROXY_PORT" envDefault:"5007"`
RedisClusterURL string `env:"REDIS_CLUSTER_URL"`
RedisURL string `env:"REDIS_URL"`
Services []string `env:"ORCHESTRATOR_SERVICES" envDefault:"orchestrator"`
ClickhouseConnectionString string `env:"CLICKHOUSE_CONNECTION_STRING"`
ForceStop bool `env:"FORCE_STOP"`
GRPCPort uint16 `env:"GRPC_PORT" envDefault:"5008"`
LaunchDarklyAPIKey string `env:"LAUNCH_DARKLY_API_KEY"`
OrchestratorLockPath string `env:"ORCHESTRATOR_LOCK_PATH" envDefault:"/orchestrator.lock"`
ProxyPort uint16 `env:"PROXY_PORT" envDefault:"5007"`
RedisClusterURL string `env:"REDIS_CLUSTER_URL"`
RedisURL string `env:"REDIS_URL"`
Services []string `env:"ORCHESTRATOR_SERVICES" envDefault:"orchestrator"`
SharedStateDirectory string `env:"SHARED_STATE_DIRECTORY" envDefault:"/orchestrator/state"`
SharedStateWriteInterval time.Duration `env:"SHARED_STATE_WRITE_INTERVAL" envDefault:"1m"`
MaxStartingInstances int64 `env:"MAX_STARTING_INSTANCES" envDefault:"3"`
}

func Parse() (Config, error) {
Expand Down
82 changes: 82 additions & 0 deletions packages/orchestrator/internal/server/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package server

import (
"context"
"fmt"

"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"github.com/e2b-dev/infra/packages/orchestrator/internal/sharedstate"
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)

type Limiter struct {
maxStartingSandboxes int64

featureFlags *featureflags.Client
startingSandboxes *semaphore.Weighted
sharedStateManager *sharedstate.Manager
}

func NewLimiter(
maxStartingSandboxes int64,
featureFlags *featureflags.Client,
sharedStateManager *sharedstate.Manager,
) *Limiter {
return &Limiter{
featureFlags: featureFlags,
sharedStateManager: sharedStateManager,
maxStartingSandboxes: maxStartingSandboxes,
startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes),
}
}

type TooManySandboxesRunningError struct {
Current, Max int
}

func (t TooManySandboxesRunningError) Error() string {
return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max)
}

var _ error = TooManySandboxesRunningError{}

type TooManySandboxesStartingError struct {
Max int64
}

var _ error = TooManySandboxesStartingError{}

func (t TooManySandboxesStartingError) Error() string {
return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max)
}

func (t *Limiter) AcquireStarting(ctx context.Context) error {
maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode)
if err != nil {
zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err))
}

runningSandboxes := t.sharedStateManager.TotalRunningCount()
if runningSandboxes >= maxRunningSandboxesPerNode {
telemetry.ReportEvent(ctx, "max number of running sandboxes reached")

return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode}
}

// Check if we've reached the max number of starting instances on this node
acquired := t.startingSandboxes.TryAcquire(1)
if !acquired {
telemetry.ReportEvent(ctx, "too many starting sandboxes on node")

return TooManySandboxesStartingError{t.maxStartingSandboxes}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Error Type Mismatch in Sandbox Acquisition

The AcquireStarting method returns a simple ErrTooManyStarting error when the starting sandbox limit is reached. The calling code, however, expects a TooManySandboxesStartingError type for errors.As() checks, which includes Current and Max fields. This mismatch prevents proper error handling and loses specific context.

Fix in Cursor Fix in Web


return nil
}

func (t *Limiter) ReleaseStarting() {
defer t.startingSandboxes.Release(1)
}
48 changes: 24 additions & 24 deletions packages/orchestrator/internal/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"github.com/e2b-dev/infra/packages/orchestrator/internal/events"
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
Expand All @@ -25,18 +24,18 @@ import (
type Server struct {
orchestrator.UnimplementedSandboxServiceServer

sandboxFactory *sandbox.Factory
info *service.ServiceInfo
sandboxes *sandbox.Map
proxy *proxy.SandboxProxy
networkPool *network.Pool
templateCache *template.Cache
pauseMu sync.Mutex
devicePool *nbd.DevicePool
persistence storage.StorageProvider
featureFlags *featureflags.Client
sbxEventsService events.EventsService[event.SandboxEvent]
startingSandboxes *semaphore.Weighted
sandboxLimiter *Limiter
sandboxFactory *sandbox.Factory
info *service.ServiceInfo
sandboxes *sandbox.Map
proxy *proxy.SandboxProxy
networkPool *network.Pool
templateCache *template.Cache
pauseMu sync.Mutex
devicePool *nbd.DevicePool
persistence storage.StorageProvider
featureFlags *featureflags.Client
sbxEventsService events.EventsService[event.SandboxEvent]
}

type ServiceConfig struct {
Expand All @@ -51,21 +50,22 @@ type ServiceConfig struct {
Persistence storage.StorageProvider
FeatureFlags *featureflags.Client
SbxEventsService events.EventsService[event.SandboxEvent]
SandboxLimiter *Limiter
}

func New(cfg ServiceConfig) *Server {
server := &Server{
sandboxFactory: cfg.SandboxFactory,
info: cfg.Info,
proxy: cfg.Proxy,
sandboxes: cfg.Sandboxes,
networkPool: cfg.NetworkPool,
templateCache: cfg.TemplateCache,
devicePool: cfg.DevicePool,
persistence: cfg.Persistence,
featureFlags: cfg.FeatureFlags,
sbxEventsService: cfg.SbxEventsService,
startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode),
sandboxFactory: cfg.SandboxFactory,
info: cfg.Info,
proxy: cfg.Proxy,
sandboxes: cfg.Sandboxes,
networkPool: cfg.NetworkPool,
templateCache: cfg.TemplateCache,
devicePool: cfg.DevicePool,
persistence: cfg.Persistence,
featureFlags: cfg.FeatureFlags,
sbxEventsService: cfg.SbxEventsService,
sandboxLimiter: cfg.SandboxLimiter,
}

meter := cfg.Tel.MeterProvider.Meter("orchestrator.sandbox")
Expand Down
41 changes: 19 additions & 22 deletions packages/orchestrator/internal/server/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ import (

var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/internal/server")

const (
requestTimeout = 60 * time.Second
maxStartingInstancesPerNode = 3
)
const requestTimeout = 60 * time.Second

func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequest) (*orchestrator.SandboxCreateResponse, error) {
// set max request timeout for this request
Expand Down Expand Up @@ -66,26 +63,26 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
Build(),
)

maxRunningSandboxesPerNode, err := s.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode)
if err != nil {
zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err))
}

runningSandboxes := s.sandboxes.Count()
if runningSandboxes >= maxRunningSandboxesPerNode {
telemetry.ReportEvent(ctx, "max number of running sandboxes reached")

return nil, status.Errorf(codes.ResourceExhausted, "max number of running sandboxes on node reached (%d), please retry", maxRunningSandboxesPerNode)
}

// Check if we've reached the max number of starting instances on this node
acquired := s.startingSandboxes.TryAcquire(1)
if !acquired {
telemetry.ReportEvent(ctx, "too many starting sandboxes on node")

return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry")
if err := s.sandboxLimiter.AcquireStarting(ctx); err != nil {
var tooManyRunning TooManySandboxesRunningError
var tooManyStarting TooManySandboxesStartingError
switch {
case errors.As(err, &tooManyRunning):
telemetry.ReportEvent(ctx, "max number of running sandboxes reached")

return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes on node reached (%d>=%d), please retry", tooManyRunning.Current, tooManyRunning.Max)
case errors.As(err, &tooManyStarting):
telemetry.ReportEvent(ctx, "too many starting sandboxes on node")

return nil, status.Errorf(codes.ResourceExhausted, "too many sandboxes starting on this node, please retry")
default:
return nil, fmt.Errorf("unexpected error while acquiring starting lock: %w", err)
}
}
defer s.startingSandboxes.Release(1)
defer func() {
s.sandboxLimiter.ReleaseStarting()
}()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incomplete Error Handling Causes Semaphore Mismatch

The AcquireStarting call has incomplete error handling; if it returns an unexpected error, execution continues, potentially allowing sandbox creation. This also causes the unconditional ReleaseStarting defer to incorrectly decrement the semaphore count when AcquireStarting fails, leading to more concurrent operations than intended.

Fix in Cursor Fix in Web


template, err := s.templateCache.GetTemplate(
ctx,
Expand Down
36 changes: 14 additions & 22 deletions packages/orchestrator/internal/service/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/e2b-dev/infra/packages/orchestrator/internal/metrics"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sharedstate"
orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info"
)

type Server struct {
orchestratorinfo.UnimplementedInfoServiceServer

info *ServiceInfo
sandboxes *sandbox.Map
info *ServiceInfo
sharedState *sharedstate.Manager
}

func NewInfoService(info *ServiceInfo, sandboxes *sandbox.Map) *Server {
func NewInfoService(info *ServiceInfo, tracker *sharedstate.Manager) *Server {
s := &Server{
info: info,
sandboxes: sandboxes,
info: info,
sharedState: tracker,
}

return s
Expand Down Expand Up @@ -51,15 +51,7 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator
}

// Calculate sandbox resource allocation
sandboxVCpuAllocated := uint32(0)
sandboxMemoryAllocated := uint64(0)
sandboxDiskAllocated := uint64(0)

for _, item := range s.sandboxes.Items() {
sandboxVCpuAllocated += uint32(item.Config.Vcpu)
sandboxMemoryAllocated += uint64(item.Config.RamMB) * 1024 * 1024
sandboxDiskAllocated += uint64(item.Config.TotalDiskSizeMB) * 1024 * 1024
}
allocated := s.sharedState.TotalAllocated()

return &orchestratorinfo.ServiceInfoResponse{
NodeId: info.ClientId,
Expand All @@ -73,10 +65,10 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator
ServiceRoles: info.Roles,

// Allocated resources to sandboxes
MetricCpuAllocated: sandboxVCpuAllocated,
MetricMemoryAllocatedBytes: sandboxMemoryAllocated,
MetricDiskAllocatedBytes: sandboxDiskAllocated,
MetricSandboxesRunning: uint32(s.sandboxes.Count()),
MetricCpuAllocated: allocated.VCPUs,
MetricMemoryAllocatedBytes: allocated.MemoryBytes,
MetricDiskAllocatedBytes: allocated.DiskBytes,
MetricSandboxesRunning: allocated.Sandboxes,

// Host system usage metrics
MetricCpuPercent: uint32(cpuMetrics.UsedPercent),
Expand All @@ -90,9 +82,9 @@ func (s *Server) ServiceInfo(_ context.Context, _ *emptypb.Empty) (*orchestrator
MetricDisks: convertDiskMetrics(diskMetrics),

// TODO: Remove when migrated
MetricVcpuUsed: int64(sandboxVCpuAllocated),
MetricMemoryUsedMb: int64(sandboxMemoryAllocated / (1024 * 1024)),
MetricDiskMb: int64(sandboxDiskAllocated / (1024 * 1024)),
MetricVcpuUsed: int64(allocated.VCPUs),
MetricMemoryUsedMb: int64(allocated.MemoryBytes / (1024 * 1024)),
MetricDiskMb: int64(allocated.DiskBytes / (1024 * 1024)),
}, nil
}

Expand Down
Loading
Loading