diff --git a/.env b/.env index 255d4440f21b2..736362fba64bb 100644 --- a/.env +++ b/.env @@ -38,6 +38,8 @@ LOCAL_DOCKER_MOUNT=/tmp/airbyte_local # Issue: https://github.com/airbytehq/airbyte/issues/577 HACK_LOCAL_ROOT_PARENT=/tmp +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=segment WEBAPP_URL=http://localhost:8000/ diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 39a1f7846bf66..b1982b074a8d4 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -73,6 +73,8 @@ public interface Configs { String getKubeNamespace(); + String getSubmitterNumThreads(); + // Resources String getCpuRequest(); diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index a504c6a71257f..e357c3613591c 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -62,6 +62,7 @@ public class EnvConfigs implements Configs { private static final String TEMPORAL_HOST = "TEMPORAL_HOST"; private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS"; private static final String KUBE_NAMESPACE = "KUBE_NAMESPACE"; + private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS"; private static final String RESOURCE_CPU_REQUEST = "RESOURCE_CPU_REQUEST"; private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT"; private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST"; @@ -211,6 +212,11 @@ public String getKubeNamespace() { return getEnvOrDefault(KUBE_NAMESPACE, DEFAULT_KUBE_NAMESPACE); } + @Override + public String getSubmitterNumThreads() { + return getEnvOrDefault(SUBMITTER_NUM_THREADS, "5"); + } + @Override public String getCpuRequest() { return getEnvOrDefault(RESOURCE_CPU_REQUEST, DEFAULT_RESOURCE_REQUIREMENT_CPU); diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 9a0e66c7444c2..dc04002f8a315 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -75,16 +75,20 @@ /** * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch - * them. The current implementation uses a thread pool on the scheduler's machine to launch the - * jobs. One thread is reserved for the job submitter, which is responsible for finding and - * launching new jobs. + * them. The current implementation uses two thread pools to do so. One pool is responsible for all + * job launching operations. The other pool is responsible for clean up operations. + * + * Operations can have thread pools under the hood. An important thread pool to note is that the job + * submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this + * pool determines the number of concurrent jobs that can be run. This is controlled via the + * {@link #SUBMITTER_NUM_THREADS} variable. */ public class SchedulerApp { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final long GRACEFUL_SHUTDOWN_SECONDS = 30; - private static final int MAX_WORKERS = 4; + private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5); private static final Duration CLEANING_DELAY = Duration.ofHours(2); private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build(); @@ -121,7 +125,7 @@ public void start() throws IOException { final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, processFactory); temporalPool.run(); - final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY); + final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY); final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier); diff --git a/docker-compose.yaml b/docker-compose.yaml index a4fb3aeb5a16b..33cf2e99338ee 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -63,6 +63,7 @@ services: - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - GCP_STORAGE_BUCKET=${GCP_STORAGE_BUCKET} - LOG_LEVEL=${LOG_LEVEL} + - SUBMITTER_NUM_THREADS=${SUBMITTER_NUM_THREADS} - RESOURCE_CPU_REQUEST=${RESOURCE_CPU_REQUEST} - RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT} - RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST} diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index a6d1de033b944..6b77c81e531b8 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=logging WEBAPP_URL=airbyte-webapp-svc:80 diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index b410fa8fbdcea..fea24ed7be8e8 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=segment WEBAPP_URL=airbyte-webapp-svc:80 diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index b410fa8fbdcea..fea24ed7be8e8 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +SUBMITTER_NUM_THREADS=10 + # Miscellaneous TRACKING_STRATEGY=segment WEBAPP_URL=airbyte-webapp-svc:80 diff --git a/kube/resources/scheduler.yaml b/kube/resources/scheduler.yaml index 1d1e752574c7c..5c80f1f6a3fa6 100644 --- a/kube/resources/scheduler.yaml +++ b/kube/resources/scheduler.yaml @@ -99,6 +99,11 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: SUBMITTER_NUM_THREADS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: SUBMITTER_NUM_THREADS - name: RESOURCE_CPU_REQUEST valueFrom: configMapKeyRef: