Skip to content

Commit

Permalink
Restructure scheduler module to expose more tightly scoped packages (a…
Browse files Browse the repository at this point in the history
…irbytehq#2543)

Splits scheduler into 4 submodules:
* airbyte-scheduler:app - this is the "scheduler". the thing that runs and schedules work. (no other module should depend on this one.)

*airbyte-scheduler:client - this is the module that anything that needs to submit work to the schedule should depend on. (any module is allowed to depend on this one.)

* airbyte-scheduler:models - pojos / structs that are used as part of interfaces across modules (both internal and external to the scheduler). (any module is allowed to depend on this one.)

* airbyte-scheduler:persistence - code for interacting with the underlying scheduler persistence (database). since the client and main interact via writing to the database both the client and main depend on this. (right now anything that invokes the scheduler client also depends on this unfortunately because the client requires the database to be instantiated. ideally this would only be depended upon by the app (and a server if we ever had one))
  • Loading branch information
cgardens authored Mar 24, 2021
1 parent 0c603db commit f5094b5
Show file tree
Hide file tree
Showing 63 changed files with 145 additions and 83 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:models')
implementation project(':airbyte-scheduler:persistence')
implementation project(':airbyte-workers')

testImplementation "org.testcontainers:postgresql:1.15.1"
}

application {
applicationName = "airbyte-scheduler"
mainClass = 'io.airbyte.scheduler.SchedulerApp'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import com.google.common.collect.Sets;
import io.airbyte.config.WorkspaceRetentionConfig;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import java.nio.file.Paths;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.IOException;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.job_factory.DefaultSyncJobFactory;
import io.airbyte.scheduler.job_factory.SyncJobFactory;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.concurrency.LifecycledCallable;
import io.airbyte.commons.enums.Enums;
import io.airbyte.scheduler.JobTracker.JobState;
import io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.app.worker_run.WorkerRun;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.worker_run.WorkerRun;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.WorkerConstants;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import java.time.Instant;
import java.util.Optional;
import java.util.function.BiPredicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airbyte.analytics.TrackingClientSingleton;
Expand All @@ -36,9 +36,12 @@
import io.airbyte.config.persistence.DefaultConfigPersistence;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.process.DockerProcessBuilderFactory;
import io.airbyte.workers.process.KubeProcessBuilderFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler.worker_run;
package io.airbyte.scheduler.app.worker_run;

import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
Expand All @@ -31,7 +31,7 @@
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.scheduler.Job;
import io.airbyte.scheduler.models.Job;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler.worker_run;
package io.airbyte.scheduler.app.worker_run;

import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.EnvConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.config.WorkspaceRetentionConfig;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.airbyte.config.JobConfig;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.IOException;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -40,8 +40,9 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.scheduler.job_factory.SyncJobFactory;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -44,10 +44,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import io.airbyte.config.JobOutput;
import io.airbyte.scheduler.JobTracker.JobState;
import io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.app.worker_run.WorkerRun;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.worker_run.WorkerRun;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.app;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -31,6 +31,8 @@

import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler.temporal;
package io.airbyte.scheduler.app.worker_run;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -38,9 +38,7 @@
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.Job;
import io.airbyte.scheduler.worker_run.TemporalWorkerRunFactory;
import io.airbyte.scheduler.worker_run.WorkerRun;
import io.airbyte.scheduler.models.Job;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler.worker_run;
package io.airbyte.scheduler.app.worker_run;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
Expand Down
13 changes: 13 additions & 0 deletions airbyte-scheduler/client/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id "java-library"
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:models')
implementation project(':airbyte-scheduler:persistence')
// todo (cgardens) - remove this dep. just needs temporal client.
implementation project(':airbyte-workers')

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.scheduler.Job;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.JobTracker;
import io.airbyte.scheduler.JobTracker.JobState;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalResponse;
import java.io.IOException;
Expand Down Expand Up @@ -127,7 +127,6 @@ <T> SynchronousResponse<T> execute(ConfigType configType,
createdAt,
endedAt);
} catch (RuntimeException e) {
// todo handle null.
track(jobId, configType, jobTrackerId, JobState.FAILED, null);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.scheduler.Job;
import io.airbyte.scheduler.models.Job;
import java.io.IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.scheduler.Job;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.JobTracker;
import io.airbyte.scheduler.JobTracker.JobState;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.temporal.JobMetadata;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalResponse;
Expand Down
8 changes: 8 additions & 0 deletions airbyte-scheduler/models/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plugins {
id "java-library"
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.models;

import io.airbyte.config.JobOutput;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.scheduler.models;

import com.google.common.collect.Sets;
import java.util.Set;
Expand Down
Loading

0 comments on commit f5094b5

Please sign in to comment.