Skip to content

Commit

Permalink
make worker pooling configurable per-service in runtime config
Browse files Browse the repository at this point in the history
  • Loading branch information
eandre committed Dec 16, 2024
1 parent 413596e commit b9e3064
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 637 deletions.
22 changes: 10 additions & 12 deletions cli/daemon/run/runtime_config2.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type RuntimeConfigGenerator struct {
SvcConfigs map[string]string

conf *rtconfgen.Builder
compute *runtimev1.Compute
authKeys []*runtimev1.EncoreAuthKey
}

Expand Down Expand Up @@ -145,11 +144,13 @@ func (g *RuntimeConfigGenerator) initialize() error {
if err != nil {
return errors.Wrap(err, "failed to get app's build settings")
}
g.compute = &runtimev1.Compute{}
if buildSettings.WorkerPooling {
// Auto-detect the number of worker threads.
n := int32(0)
g.compute.WorkerThreads = &n
for _, svc := range g.md.Svcs {
cfg := &runtimev1.HostedService{Name: svc.Name}
if buildSettings.WorkerPooling {
n := int32(0)
cfg.WorkerThreads = &n
}
g.conf.ServiceConfig(cfg)
}

g.conf.AuthMethods([]*runtimev1.ServiceAuth{
Expand Down Expand Up @@ -449,7 +450,6 @@ func (g *RuntimeConfigGenerator) ProcPerService(proxy *svcproxy.SvcProxy) (servi
// Set up the service processes.
for _, svc := range g.md.Svcs {
conf, err := g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsServices(svc.Name).
ReduceWithMeta(g.md).
Expand All @@ -473,7 +473,7 @@ func (g *RuntimeConfigGenerator) ProcPerService(proxy *svcproxy.SvcProxy) (servi

// Set up the gateways.
for _, gw := range g.md.Gateways {
conf, err := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd).HostsGateways(gw.EncoreName).ReduceWithMeta(g.md).BuildRuntimeConfig()
conf, err := g.conf.Deployment(newRid()).ServiceDiscovery(sd).HostsGateways(gw.EncoreName).ReduceWithMeta(g.md).BuildRuntimeConfig()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to generate runtime config")
}
Expand All @@ -500,7 +500,7 @@ func (g *RuntimeConfigGenerator) AllInOneProc() (*ProcConfig, error) {

sd := &runtimev1.ServiceDiscovery{Services: make(map[string]*runtimev1.ServiceDiscovery_Location)}

d := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd)
d := g.conf.Deployment(newRid()).ServiceDiscovery(sd)
for _, gw := range g.md.Gateways {
d.HostsGateways(gw.EncoreName)
}
Expand Down Expand Up @@ -570,7 +570,6 @@ func (g *RuntimeConfigGenerator) ProcPerServiceWithNewRuntimeConfig(proxy *svcpr

for _, svc := range g.md.Svcs {
conf, err = g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsServices(svc.Name).
ReduceWithMeta(g.md).
Expand All @@ -594,7 +593,6 @@ func (g *RuntimeConfigGenerator) ProcPerServiceWithNewRuntimeConfig(proxy *svcpr
}

conf, err = g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsGateways(gw.EncoreName).
//ReduceWithMeta(g.md).
Expand All @@ -620,7 +618,7 @@ func (g *RuntimeConfigGenerator) ForTests(newRuntimeConf bool) (envs []string, e

sd := &runtimev1.ServiceDiscovery{Services: make(map[string]*runtimev1.ServiceDiscovery_Location)}

d := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd)
d := g.conf.Deployment(newRid()).ServiceDiscovery(sd)
for _, gw := range g.md.Gateways {
d.HostsGateways(gw.EncoreName)
}
Expand Down
43 changes: 26 additions & 17 deletions pkg/rtconfgen/base_builder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package rtconfgen

import (
"cmp"
"fmt"
"slices"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -41,6 +43,7 @@ type Builder struct {
defaultDeployedAt time.Time

deployments map[string]*Deployment
services map[string]*runtimev1.HostedService
}

func NewBuilder() *Builder {
Expand All @@ -50,6 +53,7 @@ func NewBuilder() *Builder {
rs: rs,
obs: &runtimev1.Observability{},
deployments: make(map[string]*Deployment),
services: make(map[string]*runtimev1.HostedService),
}

return b
Expand Down Expand Up @@ -109,6 +113,10 @@ func (b *Builder) LogsProviderFn(rid string, fn func() *runtimev1.LogsProvider)
addResFunc(&b.obs.Logs, b.rs, rid, fn)
}

func (b *Builder) ServiceConfig(svc *runtimev1.HostedService) {
b.services[svc.Name] = svc
}

func (b *Builder) Deployment(rid string) *Deployment {
if d, ok := b.deployments[rid]; ok {
return d
Expand All @@ -135,14 +143,11 @@ type Deployment struct {
// The service-discovery configuration for this deployment.
sd *runtimev1.ServiceDiscovery

// The compute configuration for this deployment.
compute *runtimev1.Compute

// The base URL for reaching this deployment from another service.
svc2svcBaseURL string

hostedGateways []string
hostedServices []string
hostedGateways []string
hostedServiceNames []string
}

// DeployID sets the deploy id.
Expand All @@ -165,7 +170,7 @@ func (d *Deployment) DynamicExperiments(experiments []string) *Deployment {
// HostsServices adds the given service names as being hosted by this deployment.
// It appends and doesn't overwrite any existing hosted services.
func (d *Deployment) HostsServices(names ...string) *Deployment {
d.hostedServices = append(d.hostedServices, names...)
d.hostedServiceNames = append(d.hostedServiceNames, names...)
return d
}

Expand All @@ -188,11 +193,6 @@ func (d *Deployment) ServiceDiscovery(sd *runtimev1.ServiceDiscovery) *Deploymen
return d
}

func (d *Deployment) Compute(c *runtimev1.Compute) *Deployment {
d.compute = c
return d
}

func (d *Deployment) ReduceWithMeta(md *meta.Data) *Deployment {
d.reduceWith = option.Some(md)
return d
Expand All @@ -206,17 +206,27 @@ func (d *Deployment) BuildRuntimeConfig() (*runtimev1.RuntimeConfig, error) {
return nil, err
}
if reduced, ok := d.reduceWith.Get(); ok {
infra = reduceForServices(infra, reduced, d.hostedServices)
infra = reduceForServices(infra, reduced, d.hostedServiceNames)
}

graceful := d.gracefulShutdown.GetOrElse(d.b.defaultGracefulShutdown)

var hostedServices []*runtimev1.HostedService
for _, svcName := range d.hostedServices {
hostedServices = append(hostedServices, &runtimev1.HostedService{
Name: svcName,
})
{
for _, svcName := range d.hostedServiceNames {
// If we have a service config defined for this service, use it.
cfg := b.services[svcName]
if cfg == nil {
cfg = &runtimev1.HostedService{
Name: svcName,
}
}
hostedServices = append(hostedServices, cfg)
}
}
slices.SortFunc(hostedServices, func(a, b *runtimev1.HostedService) int {
return cmp.Compare(a.Name, b.Name)
})

gatewaysByName := make(map[string]*runtimev1.Gateway)
for _, gw := range infra.Resources.Gateways {
Expand All @@ -236,7 +246,6 @@ func (d *Deployment) BuildRuntimeConfig() (*runtimev1.RuntimeConfig, error) {
HostedGateways: gatewayRids,
HostedServices: hostedServices,
ServiceDiscovery: d.sd,
Compute: d.compute,
GracefulShutdown: graceful,
DynamicExperiments: d.dynamicExperiments,
Observability: b.obs,
Expand Down
24 changes: 8 additions & 16 deletions pkg/rtconfgen/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"

"github.com/cockroachdb/errors"
"github.com/rs/zerolog"

"go.encore.dev/platform-sdk/pkg/auth"

Expand Down Expand Up @@ -53,7 +54,6 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
PubsubTopics: make(map[string]*config.PubsubTopic),
Buckets: make(map[string]*config.Bucket),
CORS: &config.CORS{},
LogLevel: "trace",
}

// Deployment handling.
Expand Down Expand Up @@ -135,24 +135,16 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
}
}

if compute := deployment.Compute; compute != nil {
if compute.LogLevel != nil {
switch *compute.LogLevel {
case runtimev1.Compute_LOG_LEVEL_DISABLED:
cfg.LogLevel = "disabled"
case runtimev1.Compute_LOG_LEVEL_ERROR:
cfg.LogLevel = "error"
case runtimev1.Compute_LOG_LEVEL_WARN:
cfg.LogLevel = "warn"
case runtimev1.Compute_LOG_LEVEL_INFO:
cfg.LogLevel = "info"
case runtimev1.Compute_LOG_LEVEL_DEBUG:
cfg.LogLevel = "debug"
case runtimev1.Compute_LOG_LEVEL_TRACE:
cfg.LogLevel = "trace"
// Use the most verbose logging requested.
currLevel := zerolog.TraceLevel
for _, svc := range deployment.HostedServices {
if svc.LogConfig != nil {
if level, err := zerolog.ParseLevel(*svc.LogConfig); err == nil && level < currLevel {
currLevel = level
}
}
}
cfg.LogConfig = currLevel.String()
}

// Infrastructure handling.
Expand Down
Loading

0 comments on commit b9e3064

Please sign in to comment.