-
Notifications
You must be signed in to change notification settings - Fork 0
Scaler refactor #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| package internal | ||
|
|
||
| import ( | ||
| "aether/shared/db" | ||
| "aether/shared/logger" | ||
| "aether/shared/protocol" | ||
| "context" | ||
| "encoding/json" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/redis/go-redis/v9" | ||
| ) | ||
|
|
||
| type Scaler struct { | ||
| redis *redis.Client | ||
| discovery *Discovery | ||
| db *db.DB | ||
| activeRequests map[string]int64 | ||
| mu sync.RWMutex | ||
| checkInterval time.Duration | ||
| threshold int | ||
| maxInstances int | ||
| } | ||
|
|
||
| func NewScaler(redis *redis.Client, discovery *Discovery, database *db.DB, checkInterval time.Duration, threshold int, maxInstances int) *Scaler { | ||
| return &Scaler{ | ||
| redis: redis, | ||
| discovery: discovery, | ||
| db: database, | ||
| activeRequests: make(map[string]int64), | ||
| mu: sync.RWMutex{}, | ||
| checkInterval: checkInterval, | ||
| threshold: threshold, | ||
| maxInstances: maxInstances, | ||
| } | ||
| } | ||
|
|
||
| func (s *Scaler) TrackRequest(functionID string, delta int64) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| s.activeRequests[functionID] += delta | ||
| if s.activeRequests[functionID] < 0 { | ||
| s.activeRequests[functionID] = 0 | ||
ferdzo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| func (s *Scaler) Run(ctx context.Context) { | ||
| ticker := time.NewTicker(s.checkInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| s.check(ctx) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (s *Scaler) check(ctx context.Context) { | ||
| s.mu.RLock() | ||
| snapshot := make(map[string]int64, len(s.activeRequests)) | ||
| for fnID, count := range s.activeRequests { | ||
| snapshot[fnID] = count | ||
| } | ||
| s.mu.RUnlock() | ||
|
|
||
| var toCleanup []string | ||
|
|
||
| for functionID, activeCount := range snapshot { | ||
| if activeCount == 0 { | ||
| toCleanup = append(toCleanup, functionID) | ||
| continue | ||
| } | ||
|
|
||
| instances, err := s.discovery.GetInstances(ctx, functionID) | ||
| if err != nil { | ||
| logger.Error("failed to get instances", "function", functionID, "error", err) | ||
| continue | ||
| } | ||
|
Comment on lines
+79
to
+83
|
||
|
|
||
| currentCount := len(instances) | ||
|
|
||
| // Calculate target instances: ceil(active / threshold) | ||
| targetCount := int((activeCount + int64(s.threshold) - 1) / int64(s.threshold)) | ||
|
|
||
| // Cap target at maxInstances to prevent runaway scaling | ||
| if targetCount > s.maxInstances { | ||
| targetCount = s.maxInstances | ||
| } | ||
|
|
||
| if targetCount > currentCount { | ||
| deficit := targetCount - currentCount | ||
| logger.Info("gateway scaling up", | ||
| "function", functionID, | ||
| "active_requests", activeCount, | ||
| "current_instances", currentCount, | ||
| "target_instances", targetCount, | ||
| "provisioning", deficit) | ||
|
|
||
| // Push provision jobs to queue | ||
| for i := 0; i < deficit; i++ { | ||
| if err := s.pushProvisionJob(ctx, functionID); err != nil { | ||
| logger.Error("failed to push provision job", "function", functionID, "error", err) | ||
| } | ||
| } | ||
ferdzo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| if len(toCleanup) > 0 { | ||
| s.mu.Lock() | ||
| for _, functionID := range toCleanup { | ||
| if s.activeRequests[functionID] == 0 { | ||
| delete(s.activeRequests, functionID) | ||
| } | ||
| } | ||
| s.mu.Unlock() | ||
| } | ||
| } | ||
|
|
||
| func (s *Scaler) pushProvisionJob(ctx context.Context, functionID string) error { | ||
| fn, err := s.db.GetFunction(functionID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| job := protocol.Job{ | ||
| FunctionID: functionID, | ||
| ImageID: fn.CodePath, | ||
| Runtime: fn.Runtime, | ||
| Entrypoint: fn.Entrypoint, | ||
| VCPU: int(fn.VCPU), | ||
| MemoryMB: int(fn.MemoryMB), | ||
| Port: fn.Port, | ||
| EnvVars: fn.EnvVars, | ||
| } | ||
|
|
||
| data, err := json.Marshal(job) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return s.redis.LPush(ctx, "queue:vm_provision", string(data)).Err() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation states that workers check capacity "independently (no coordination)" and that "each worker checks capacity independently", but the actual capacity check has a TOCTOU race condition. Between checking capacity (line 101-106) and spawning instances (line 188 in handleJob), multiple workers could exceed the limit. This discrepancy between the documented behavior and actual implementation could mislead users about the system's reliability.
Consider either: (1) updating the documentation to mention this race condition and its implications, or (2) implementing proper coordination to ensure the documented behavior is accurate.