Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtimes/js: worker pooling #1652

Merged
merged 13 commits into from
Dec 18, 2024
Prev Previous commit
Next Next commit
make worker pooling configurable per-service in runtime config
  • Loading branch information
eandre committed Dec 16, 2024
commit 0ac208e69aa4e5451c4349bc8311dc28d0c1aef6
22 changes: 10 additions & 12 deletions cli/daemon/run/runtime_config2.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type RuntimeConfigGenerator struct {
SvcConfigs map[string]string

conf *rtconfgen.Builder
compute *runtimev1.Compute
authKeys []*runtimev1.EncoreAuthKey
}

Expand Down Expand Up @@ -145,11 +144,13 @@ func (g *RuntimeConfigGenerator) initialize() error {
if err != nil {
return errors.Wrap(err, "failed to get app's build settings")
}
g.compute = &runtimev1.Compute{}
if buildSettings.WorkerPooling {
// Auto-detect the number of worker threads.
n := int32(0)
g.compute.WorkerThreads = &n
for _, svc := range g.md.Svcs {
cfg := &runtimev1.HostedService{Name: svc.Name}
if buildSettings.WorkerPooling {
n := int32(0)
cfg.WorkerThreads = &n
}
g.conf.ServiceConfig(cfg)
}

g.conf.AuthMethods([]*runtimev1.ServiceAuth{
Expand Down Expand Up @@ -449,7 +450,6 @@ func (g *RuntimeConfigGenerator) ProcPerService(proxy *svcproxy.SvcProxy) (servi
// Set up the service processes.
for _, svc := range g.md.Svcs {
conf, err := g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsServices(svc.Name).
ReduceWithMeta(g.md).
Expand All @@ -473,7 +473,7 @@ func (g *RuntimeConfigGenerator) ProcPerService(proxy *svcproxy.SvcProxy) (servi

// Set up the gateways.
for _, gw := range g.md.Gateways {
conf, err := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd).HostsGateways(gw.EncoreName).ReduceWithMeta(g.md).BuildRuntimeConfig()
conf, err := g.conf.Deployment(newRid()).ServiceDiscovery(sd).HostsGateways(gw.EncoreName).ReduceWithMeta(g.md).BuildRuntimeConfig()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to generate runtime config")
}
Expand All @@ -500,7 +500,7 @@ func (g *RuntimeConfigGenerator) AllInOneProc() (*ProcConfig, error) {

sd := &runtimev1.ServiceDiscovery{Services: make(map[string]*runtimev1.ServiceDiscovery_Location)}

d := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd)
d := g.conf.Deployment(newRid()).ServiceDiscovery(sd)
for _, gw := range g.md.Gateways {
d.HostsGateways(gw.EncoreName)
}
Expand Down Expand Up @@ -570,7 +570,6 @@ func (g *RuntimeConfigGenerator) ProcPerServiceWithNewRuntimeConfig(proxy *svcpr

for _, svc := range g.md.Svcs {
conf, err = g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsServices(svc.Name).
ReduceWithMeta(g.md).
Expand All @@ -594,7 +593,6 @@ func (g *RuntimeConfigGenerator) ProcPerServiceWithNewRuntimeConfig(proxy *svcpr
}

conf, err = g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsGateways(gw.EncoreName).
//ReduceWithMeta(g.md).
Expand All @@ -620,7 +618,7 @@ func (g *RuntimeConfigGenerator) ForTests(newRuntimeConf bool) (envs []string, e

sd := &runtimev1.ServiceDiscovery{Services: make(map[string]*runtimev1.ServiceDiscovery_Location)}

d := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd)
d := g.conf.Deployment(newRid()).ServiceDiscovery(sd)
for _, gw := range g.md.Gateways {
d.HostsGateways(gw.EncoreName)
}
Expand Down
43 changes: 26 additions & 17 deletions pkg/rtconfgen/base_builder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package rtconfgen

import (
"cmp"
"fmt"
"slices"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -41,6 +43,7 @@ type Builder struct {
defaultDeployedAt time.Time

deployments map[string]*Deployment
services map[string]*runtimev1.HostedService
}

func NewBuilder() *Builder {
Expand All @@ -50,6 +53,7 @@ func NewBuilder() *Builder {
rs: rs,
obs: &runtimev1.Observability{},
deployments: make(map[string]*Deployment),
services: make(map[string]*runtimev1.HostedService),
}

return b
Expand Down Expand Up @@ -109,6 +113,10 @@ func (b *Builder) LogsProviderFn(rid string, fn func() *runtimev1.LogsProvider)
addResFunc(&b.obs.Logs, b.rs, rid, fn)
}

func (b *Builder) ServiceConfig(svc *runtimev1.HostedService) {
b.services[svc.Name] = svc
}

func (b *Builder) Deployment(rid string) *Deployment {
if d, ok := b.deployments[rid]; ok {
return d
Expand All @@ -135,14 +143,11 @@ type Deployment struct {
// The service-discovery configuration for this deployment.
sd *runtimev1.ServiceDiscovery

// The compute configuration for this deployment.
compute *runtimev1.Compute

// The base URL for reaching this deployment from another service.
svc2svcBaseURL string

hostedGateways []string
hostedServices []string
hostedGateways []string
hostedServiceNames []string
}

// DeployID sets the deploy id.
Expand All @@ -165,7 +170,7 @@ func (d *Deployment) DynamicExperiments(experiments []string) *Deployment {
// HostsServices adds the given service names as being hosted by this deployment.
// It appends and doesn't overwrite any existing hosted services.
func (d *Deployment) HostsServices(names ...string) *Deployment {
d.hostedServices = append(d.hostedServices, names...)
d.hostedServiceNames = append(d.hostedServiceNames, names...)
return d
}

Expand All @@ -188,11 +193,6 @@ func (d *Deployment) ServiceDiscovery(sd *runtimev1.ServiceDiscovery) *Deploymen
return d
}

func (d *Deployment) Compute(c *runtimev1.Compute) *Deployment {
d.compute = c
return d
}

func (d *Deployment) ReduceWithMeta(md *meta.Data) *Deployment {
d.reduceWith = option.Some(md)
return d
Expand All @@ -206,17 +206,27 @@ func (d *Deployment) BuildRuntimeConfig() (*runtimev1.RuntimeConfig, error) {
return nil, err
}
if reduced, ok := d.reduceWith.Get(); ok {
infra = reduceForServices(infra, reduced, d.hostedServices)
infra = reduceForServices(infra, reduced, d.hostedServiceNames)
}

graceful := d.gracefulShutdown.GetOrElse(d.b.defaultGracefulShutdown)

var hostedServices []*runtimev1.HostedService
for _, svcName := range d.hostedServices {
hostedServices = append(hostedServices, &runtimev1.HostedService{
Name: svcName,
})
{
for _, svcName := range d.hostedServiceNames {
// If we have a service config defined for this service, use it.
cfg := b.services[svcName]
if cfg == nil {
cfg = &runtimev1.HostedService{
Name: svcName,
}
}
hostedServices = append(hostedServices, cfg)
}
}
slices.SortFunc(hostedServices, func(a, b *runtimev1.HostedService) int {
return cmp.Compare(a.Name, b.Name)
})

gatewaysByName := make(map[string]*runtimev1.Gateway)
for _, gw := range infra.Resources.Gateways {
Expand All @@ -236,7 +246,6 @@ func (d *Deployment) BuildRuntimeConfig() (*runtimev1.RuntimeConfig, error) {
HostedGateways: gatewayRids,
HostedServices: hostedServices,
ServiceDiscovery: d.sd,
Compute: d.compute,
GracefulShutdown: graceful,
DynamicExperiments: d.dynamicExperiments,
Observability: b.obs,
Expand Down
24 changes: 8 additions & 16 deletions pkg/rtconfgen/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"

"github.com/cockroachdb/errors"
"github.com/rs/zerolog"

"go.encore.dev/platform-sdk/pkg/auth"

Expand Down Expand Up @@ -53,7 +54,6 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
PubsubTopics: make(map[string]*config.PubsubTopic),
Buckets: make(map[string]*config.Bucket),
CORS: &config.CORS{},
LogLevel: "trace",
}

// Deployment handling.
Expand Down Expand Up @@ -135,24 +135,16 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
}
}

if compute := deployment.Compute; compute != nil {
if compute.LogLevel != nil {
switch *compute.LogLevel {
case runtimev1.Compute_LOG_LEVEL_DISABLED:
cfg.LogLevel = "disabled"
case runtimev1.Compute_LOG_LEVEL_ERROR:
cfg.LogLevel = "error"
case runtimev1.Compute_LOG_LEVEL_WARN:
cfg.LogLevel = "warn"
case runtimev1.Compute_LOG_LEVEL_INFO:
cfg.LogLevel = "info"
case runtimev1.Compute_LOG_LEVEL_DEBUG:
cfg.LogLevel = "debug"
case runtimev1.Compute_LOG_LEVEL_TRACE:
cfg.LogLevel = "trace"
// Use the most verbose logging requested.
currLevel := zerolog.TraceLevel
for _, svc := range deployment.HostedServices {
if svc.LogConfig != nil {
if level, err := zerolog.ParseLevel(*svc.LogConfig); err == nil && level < currLevel {
currLevel = level
}
}
}
cfg.LogConfig = currLevel.String()
}

// Infrastructure handling.
Expand Down
Loading
Loading