Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtimes/js: worker pooling #1652

Merged
merged 13 commits into from
Dec 18, 2024
Prev Previous commit
Next Next commit
configure worker pooling via appfile
  • Loading branch information
eandre committed Dec 14, 2024
commit 413596e96a4ee633c22fa9491faee9d5985b32a2
22 changes: 19 additions & 3 deletions cli/daemon/run/runtime_config2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type RuntimeConfigGenerator struct {
PlatformID() string
PlatformOrLocalID() string
GlobalCORS() (appfile.CORS, error)
BuildSettings() (appfile.Build, error)
}

// The infra manager to use
Expand Down Expand Up @@ -82,6 +83,7 @@ type RuntimeConfigGenerator struct {
SvcConfigs map[string]string

conf *rtconfgen.Builder
compute *runtimev1.Compute
authKeys []*runtimev1.EncoreAuthKey
}

Expand Down Expand Up @@ -139,6 +141,17 @@ func (g *RuntimeConfigGenerator) initialize() error {
})
}

buildSettings, err := g.app.BuildSettings()
if err != nil {
return errors.Wrap(err, "failed to get app's build settings")
}
g.compute = &runtimev1.Compute{}
if buildSettings.WorkerPooling {
// Auto-detect the number of worker threads.
n := int32(0)
g.compute.WorkerThreads = &n
}

g.conf.AuthMethods([]*runtimev1.ServiceAuth{
{
AuthMethod: &runtimev1.ServiceAuth_EncoreAuth_{
Expand Down Expand Up @@ -436,6 +449,7 @@ func (g *RuntimeConfigGenerator) ProcPerService(proxy *svcproxy.SvcProxy) (servi
// Set up the service processes.
for _, svc := range g.md.Svcs {
conf, err := g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsServices(svc.Name).
ReduceWithMeta(g.md).
Expand All @@ -459,7 +473,7 @@ func (g *RuntimeConfigGenerator) ProcPerService(proxy *svcproxy.SvcProxy) (servi

// Set up the gateways.
for _, gw := range g.md.Gateways {
conf, err := g.conf.Deployment(newRid()).ServiceDiscovery(sd).HostsGateways(gw.EncoreName).ReduceWithMeta(g.md).BuildRuntimeConfig()
conf, err := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd).HostsGateways(gw.EncoreName).ReduceWithMeta(g.md).BuildRuntimeConfig()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to generate runtime config")
}
Expand All @@ -486,7 +500,7 @@ func (g *RuntimeConfigGenerator) AllInOneProc() (*ProcConfig, error) {

sd := &runtimev1.ServiceDiscovery{Services: make(map[string]*runtimev1.ServiceDiscovery_Location)}

d := g.conf.Deployment(newRid()).ServiceDiscovery(sd)
d := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd)
for _, gw := range g.md.Gateways {
d.HostsGateways(gw.EncoreName)
}
Expand Down Expand Up @@ -556,6 +570,7 @@ func (g *RuntimeConfigGenerator) ProcPerServiceWithNewRuntimeConfig(proxy *svcpr

for _, svc := range g.md.Svcs {
conf, err = g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsServices(svc.Name).
ReduceWithMeta(g.md).
Expand All @@ -579,6 +594,7 @@ func (g *RuntimeConfigGenerator) ProcPerServiceWithNewRuntimeConfig(proxy *svcpr
}

conf, err = g.conf.Deployment(newRid()).
Compute(g.compute).
ServiceDiscovery(sd).
HostsGateways(gw.EncoreName).
//ReduceWithMeta(g.md).
Expand All @@ -604,7 +620,7 @@ func (g *RuntimeConfigGenerator) ForTests(newRuntimeConf bool) (envs []string, e

sd := &runtimev1.ServiceDiscovery{Services: make(map[string]*runtimev1.ServiceDiscovery_Location)}

d := g.conf.Deployment(newRid()).ServiceDiscovery(sd)
d := g.conf.Deployment(newRid()).Compute(g.compute).ServiceDiscovery(sd)
for _, gw := range g.md.Gateways {
d.HostsGateways(gw.EncoreName)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/appfile/appfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type Build struct {
// Docker configures the docker images built
// by Encore's CI/CD system.
Docker Docker `json:"docker,omitempty"`

// WorkerPooling enables worker pooling for Encore.ts.
WorkerPooling bool `json:"worker_pooling,omitempty"`
}

type Docker struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/rtconfgen/base_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type Deployment struct {
// The service-discovery configuration for this deployment.
sd *runtimev1.ServiceDiscovery

// The compute configuration for this deployment.
compute *runtimev1.Compute

// The base URL for reaching this deployment from another service.
svc2svcBaseURL string

Expand Down Expand Up @@ -185,6 +188,11 @@ func (d *Deployment) ServiceDiscovery(sd *runtimev1.ServiceDiscovery) *Deploymen
return d
}

func (d *Deployment) Compute(c *runtimev1.Compute) *Deployment {
d.compute = c
return d
}

func (d *Deployment) ReduceWithMeta(md *meta.Data) *Deployment {
d.reduceWith = option.Some(md)
return d
Expand Down Expand Up @@ -228,6 +236,7 @@ func (d *Deployment) BuildRuntimeConfig() (*runtimev1.RuntimeConfig, error) {
HostedGateways: gatewayRids,
HostedServices: hostedServices,
ServiceDiscovery: d.sd,
Compute: d.compute,
GracefulShutdown: graceful,
DynamicExperiments: d.dynamicExperiments,
Observability: b.obs,
Expand Down
3 changes: 2 additions & 1 deletion proto/encore/runtime/v1/runtime.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 18 additions & 17 deletions proto/encore/runtime/v1/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,24 @@ message Deployment {
}

message Compute {
// Number of worker threads to use.
// If unset, the runtime automatically determines the number of threads to use
// based on the number of CPUs available.
optional int32 worker_threads = 1;

// The minimum log level to use.
// If unset it defaults to LOG_LEVEL_TRACE.
optional LogLevel log_level = 2;

enum LogLevel {
LOG_LEVEL_DISABLED = 0;
LOG_LEVEL_ERROR = 1;
LOG_LEVEL_WARN = 2;
LOG_LEVEL_INFO = 3;
LOG_LEVEL_DEBUG = 4;
LOG_LEVEL_TRACE = 5;
}
// Number of worker threads to use.
// If unset it defaults to 1. If set to 0 the runtime
// automatically determines the number of threads to use
// based on the number of CPUs available.
optional int32 worker_threads = 1;

// The minimum log level to use.
// If unset it defaults to LOG_LEVEL_TRACE.
optional LogLevel log_level = 2;

enum LogLevel {
LOG_LEVEL_DISABLED = 0;
LOG_LEVEL_ERROR = 1;
LOG_LEVEL_WARN = 2;
LOG_LEVEL_INFO = 3;
LOG_LEVEL_DEBUG = 4;
LOG_LEVEL_TRACE = 5;
}
}

message Observability {
Expand Down
6 changes: 3 additions & 3 deletions runtimes/js/encore.dev/internal/appinit/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ export function registerGateways(gateways: Gateway[]) {

export async function run(entrypoint: string) {
if (isMainThread) {
const numWorkers = runtime.RT.numExtraWorkerThreads();
if (numWorkers > 0) {
const extraWorkers = runtime.RT.numWorkerThreads() - 1;
if (extraWorkers > 0) {
const path = fileURLToPath(entrypoint);
for (let i = 0; i < numWorkers; i++) {
for (let i = 0; i < extraWorkers; i++) {
new Worker(path);
}
}
Expand Down
18 changes: 13 additions & 5 deletions runtimes/js/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,20 @@ impl Runtime {
md.clone().into()
}

/// Reports the total number of worker threads,
/// including the main thread.
#[napi]
pub fn num_extra_worker_threads(&self) -> usize {
self.runtime
.compute()
.worker_threads
.map_or_else(|| num_cpus::get() - 1, |v| v as usize)
pub fn num_worker_threads(&self) -> u32 {
match self.runtime.compute().worker_threads {
Some(n) => {
if n > 0 {
n as u32
} else {
num_cpus::get() as u32
}
}
None => 1u32,
}
}
}

Expand Down
Loading