From b5bf9df2eb58e6dba2d0b4e10359e67262344a82 Mon Sep 17 00:00:00 2001 From: Charles Date: Thu, 8 Apr 2021 14:37:06 -0700 Subject: [PATCH] Move Temporal Client to .env (#2675) --- .env | 1 + .../main/java/io/airbyte/config/Configs.java | 2 + .../java/io/airbyte/config/EnvConfigs.java | 70 ++++++++----------- .../airbyte/scheduler/app/SchedulerApp.java | 23 ++++-- .../server/ConfigurationApiFactory.java | 9 ++- .../java/io/airbyte/server/ServerApp.java | 6 +- .../airbyte/server/apis/ConfigurationApi.java | 15 +++- .../server/handlers/SchedulerHandler.java | 14 ++-- .../server/handlers/SchedulerHandlerTest.java | 13 +++- .../workers/temporal/TemporalClient.java | 4 +- .../workers/temporal/TemporalPool.java | 16 +++-- .../workers/temporal/TemporalUtils.java | 17 +++-- docker-compose.yaml | 2 + 13 files changed, 121 insertions(+), 71 deletions(-) diff --git a/.env b/.env index af3b6e57d9e8..a5c8c0bb2573 100644 --- a/.env +++ b/.env @@ -17,3 +17,4 @@ TRACKING_STRATEGY=segment # already exist on the host filesystem and MUST be parents of *_ROOT. # Issue: https://github.com/airbytehq/airbyte/issues/577 HACK_LOCAL_ROOT_PARENT=/tmp +TEMPORAL_HOST=airbyte-temporal:7233 diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index eccb32c147f4..7837c838ab62 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -58,6 +58,8 @@ public interface Configs { WorkspaceRetentionConfig getWorkspaceRetentionConfig(); + String getTemporalHost(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index f07a03e66c35..219d454a227f 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -51,6 +51,7 @@ public class EnvConfigs implements Configs { private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB"; + private static final String TEMPORAL_HOST = "TEMPORAL_HOST"; private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1; private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60; @@ -115,62 +116,34 @@ public String getDatabaseUrl() { @Override public String getWorkspaceDockerMount() { - final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT); - if (mount != null) { - return mount; - } - - LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT); - return getWorkspaceRoot().toString(); + return getEnvOrDefault(WORKSPACE_DOCKER_MOUNT, WORKSPACE_ROOT); } @Override public String getLocalDockerMount() { - final String mount = getEnv.apply(LOCAL_DOCKER_MOUNT); - if (mount != null) { - return mount; - } - - LOGGER.info(LOCAL_DOCKER_MOUNT + " not found, defaulting to " + LOCAL_ROOT); - return getLocalRoot().toString(); + return getEnvOrDefault(LOCAL_DOCKER_MOUNT, LOCAL_ROOT); } @Override public String getDockerNetwork() { - final String network = getEnv.apply(DOCKER_NETWORK); - if (network != null) { - return network; - } - - LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK); - return DEFAULT_NETWORK; + return getEnvOrDefault(DOCKER_NETWORK, DEFAULT_NETWORK); } @Override public TrackingStrategy getTrackingStrategy() { - final String trackingStrategy = getEnv.apply(TRACKING_STRATEGY); - if (trackingStrategy == null) { - LOGGER.info("TRACKING_STRATEGY not set, defaulting to " + TrackingStrategy.LOGGING); - return TrackingStrategy.LOGGING; - } - - try { - return TrackingStrategy.valueOf(trackingStrategy.toUpperCase()); - } catch (IllegalArgumentException e) { - LOGGER.info(trackingStrategy + " not recognized, defaulting to " + TrackingStrategy.LOGGING); - return TrackingStrategy.LOGGING; - } + return getEnvOrDefault(TRACKING_STRATEGY, TrackingStrategy.LOGGING, s -> { + try { + return TrackingStrategy.valueOf(s.toUpperCase()); + } catch (IllegalArgumentException e) { + LOGGER.info(s + " not recognized, defaulting to " + TrackingStrategy.LOGGING); + return TrackingStrategy.LOGGING; + } + }); } @Override public WorkerEnvironment getWorkerEnvironment() { - final String workerEnvironment = getEnv.apply(WORKER_ENVIRONMENT); - if (workerEnvironment != null) { - return WorkerEnvironment.valueOf(workerEnvironment.toUpperCase()); - } - - LOGGER.info(WORKER_ENVIRONMENT + " not found, defaulting to " + WorkerEnvironment.DOCKER); - return WorkerEnvironment.DOCKER; + return getEnvOrDefault(WORKER_ENVIRONMENT, WorkerEnvironment.DOCKER, s -> WorkerEnvironment.valueOf(s.toUpperCase())); } @Override @@ -182,10 +155,23 @@ public WorkspaceRetentionConfig getWorkspaceRetentionConfig() { return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb); } - public long getEnvOrDefault(String key, long defaultValue) { + @Override + public String getTemporalHost() { + return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233"); + } + + private String getEnvOrDefault(String key, String defaultValue) { + return getEnvOrDefault(key, defaultValue, Function.identity()); + } + + private long getEnvOrDefault(String key, long defaultValue) { + return getEnvOrDefault(key, defaultValue, Long::parseLong); + } + + private T getEnvOrDefault(String key, T defaultValue, Function parser) { final String value = getEnv.apply(key); if (value != null) { - return Long.parseLong(value); + return parser.apply(value); } else { LOGGER.info(key + " not found, defaulting to " + defaultValue); return defaultValue; diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index ebb9b50fbd92..16c3d1907db5 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -47,6 +47,8 @@ import io.airbyte.workers.process.ProcessBuilderFactory; import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalPool; +import io.airbyte.workers.temporal.TemporalUtils; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; @@ -83,26 +85,33 @@ public class SchedulerApp { private final JobPersistence jobPersistence; private final ConfigRepository configRepository; private final JobCleaner jobCleaner; + private final TemporalClient temporalClient; + private final WorkflowServiceStubs temporalService; public SchedulerApp(Path workspaceRoot, ProcessBuilderFactory pbf, JobPersistence jobPersistence, ConfigRepository configRepository, - JobCleaner jobCleaner) { + JobCleaner jobCleaner, + TemporalClient temporalClient, + WorkflowServiceStubs temporalService) { this.workspaceRoot = workspaceRoot; this.pbf = pbf; this.jobPersistence = jobPersistence; this.configRepository = configRepository; this.jobCleaner = jobCleaner; + this.temporalClient = temporalClient; + this.temporalService = temporalService; + } public void start() throws IOException { - final TemporalPool temporalPool = new TemporalPool(workspaceRoot, pbf); + final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, pbf); temporalPool.run(); final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY); final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); - final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(TemporalClient.production(workspaceRoot), workspaceRoot); + final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now); final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository); final JobSubmitter jobSubmitter = new JobSubmitter( @@ -170,6 +179,9 @@ public static void main(String[] args) throws IOException, InterruptedException final Path workspaceRoot = configs.getWorkspaceRoot(); LOGGER.info("workspaceRoot = " + workspaceRoot); + final String temporalHost = configs.getTemporalHost(); + LOGGER.info("temporalHost = " + temporalHost); + LOGGER.info("Creating DB connection pool..."); final Database database = Databases.createPostgresDatabase( configs.getDatabaseUser(), @@ -206,8 +218,11 @@ public static void main(String[] args) throws IOException, InterruptedException throw new IllegalStateException("Unable to retrieve Airbyte Version, aborting..."); } + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot); + LOGGER.info("Launching scheduler..."); - new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner).start(); + new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner, temporalClient, temporalService).start(); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java index fdd265dbdc43..3cd02b8dcdcb 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java @@ -31,12 +31,14 @@ import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.apis.ConfigurationApi; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.util.Map; import org.glassfish.hk2.api.Factory; import org.slf4j.MDC; public class ConfigurationApiFactory implements Factory { + private static WorkflowServiceStubs temporalService; private static ConfigRepository configRepository; private static JobPersistence jobPersistence; private static SchedulerJobClient schedulerJobClient; @@ -73,6 +75,10 @@ public static void setMdc(Map mdc) { ConfigurationApiFactory.mdc = mdc; } + public static void setTemporalService(final WorkflowServiceStubs temporalService) { + ConfigurationApiFactory.temporalService = temporalService; + } + @Override public ConfigurationApi provide() { MDC.setContextMap(mdc); @@ -83,7 +89,8 @@ public ConfigurationApi provide() { ConfigurationApiFactory.schedulerJobClient, ConfigurationApiFactory.synchronousSchedulerClient, ConfigurationApiFactory.configs, - ConfigurationApiFactory.archiveTtlManager); + ConfigurationApiFactory.archiveTtlManager, + ConfigurationApiFactory.temporalService); } @Override diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 18c3c19ff19f..a77814f2822a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -54,6 +54,8 @@ import io.airbyte.server.errors.UncaughtExceptionMapper; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.TemporalClient; +import io.airbyte.workers.temporal.TemporalUtils; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.nio.file.Path; import java.util.Map; @@ -99,10 +101,12 @@ public void start() throws Exception { ConfigurationApiFactory.setSchedulerJobClient(new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence))); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence); - final TemporalClient temporalClient = TemporalClient.production(configs.getWorkspaceRoot()); + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost()); + final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot()); ConfigurationApiFactory .setSynchronousSchedulerClient(new SpecCachingSynchronousSchedulerClient(new DefaultSynchronousSchedulerClient(temporalClient, jobTracker))); + ConfigurationApiFactory.setTemporalService(temporalService); ConfigurationApiFactory.setConfigRepository(configRepository); ConfigurationApiFactory.setJobPersistence(jobPersistence); ConfigurationApiFactory.setConfigs(configs); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 357dcbb38b21..ac3d46dc5b39 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -100,6 +100,7 @@ import io.airbyte.server.validators.DockerImageValidator; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; import java.io.IOException; import javax.validation.Valid; @@ -124,17 +125,25 @@ public class ConfigurationApi implements io.airbyte.api.V1Api { private final LogsHandler logsHandler; private final OpenApiConfigHandler openApiConfigHandler; private final Configs configs; + private final WorkflowServiceStubs temporalService; public ConfigurationApi(final ConfigRepository configRepository, final JobPersistence jobPersistence, final SchedulerJobClient schedulerJobClient, final CachingSynchronousSchedulerClient synchronousSchedulerClient, final Configs configs, - final FileTtlManager archiveTtlManager) { + final FileTtlManager archiveTtlManager, + final WorkflowServiceStubs temporalService) { + this.temporalService = temporalService; final SpecFetcher specFetcher = new SpecFetcher(synchronousSchedulerClient); final JsonSchemaValidator schemaValidator = new JsonSchemaValidator(); - schedulerHandler = - new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, jobPersistence, configs.getWorkspaceRoot()); + schedulerHandler = new SchedulerHandler( + configRepository, + schedulerJobClient, + synchronousSchedulerClient, + jobPersistence, + configs.getWorkspaceRoot(), + temporalService); final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient); sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient); connectionsHandler = new ConnectionsHandler(configRepository); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 742a2a00ead7..8474ca1d0d2a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -71,6 +71,7 @@ import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.nio.file.Path; import java.util.UUID; @@ -85,12 +86,14 @@ public class SchedulerHandler { private final JsonSchemaValidator jsonSchemaValidator; private final JobPersistence jobPersistence; private final Path workspaceRoot; + private final WorkflowServiceStubs temporalService; public SchedulerHandler(ConfigRepository configRepository, SchedulerJobClient schedulerJobClient, SynchronousSchedulerClient synchronousSchedulerClient, JobPersistence jobPersistence, - Path workspaceRoot) { + Path workspaceRoot, + WorkflowServiceStubs temporalService) { this( configRepository, schedulerJobClient, @@ -99,7 +102,8 @@ public SchedulerHandler(ConfigRepository configRepository, new JsonSchemaValidator(), new SpecFetcher(synchronousSchedulerClient), jobPersistence, - workspaceRoot); + workspaceRoot, + temporalService); } @VisibleForTesting @@ -110,7 +114,8 @@ public SchedulerHandler(ConfigRepository configRepository, JsonSchemaValidator jsonSchemaValidator, SpecFetcher specFetcher, JobPersistence jobPersistence, - Path workspaceRoot) { + Path workspaceRoot, + WorkflowServiceStubs temporalService) { this.configRepository = configRepository; this.schedulerJobClient = schedulerJobClient; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -119,6 +124,7 @@ public SchedulerHandler(ConfigRepository configRepository, this.specFetcher = specFetcher; this.jobPersistence = jobPersistence; this.workspaceRoot = workspaceRoot; + this.temporalService = temporalService; } public CheckConnectionRead checkSourceConnectionFromSourceId(SourceIdRequestBody sourceIdRequestBody) @@ -316,7 +322,7 @@ public JobInfoRead cancelJob(JobIdRequestBody jobIdRequestBody) throws IOExcepti .setNamespace(TemporalUtils.DEFAULT_NAMESPACE) .build(); - TemporalUtils.TEMPORAL_SERVICE.blockingStub().requestCancelWorkflowExecution(cancelRequest); + temporalService.blockingStub().requestCancelWorkflowExecution(cancelRequest); return JobConverter.getJobInfoRead(jobPersistence.getJob(jobId)); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 9938beac30d6..c26e467cd398 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -80,6 +80,7 @@ import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.URI; import java.nio.file.Path; @@ -148,8 +149,16 @@ void setup() { configRepository = mock(ConfigRepository.class); jobPersistence = mock(JobPersistence.class); - schedulerHandler = new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, configurationUpdate, - jsonSchemaValidator, specFetcher, jobPersistence, mock(Path.class)); + schedulerHandler = new SchedulerHandler( + configRepository, + schedulerJobClient, + synchronousSchedulerClient, + configurationUpdate, + jsonSchemaValidator, + specFetcher, + jobPersistence, + mock(Path.class), + mock(WorkflowServiceStubs.class)); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 00b937f5699b..f7f26d106ffa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -49,8 +49,8 @@ public class TemporalClient { private final Path workspaceRoot; private final WorkflowClient client; - public static TemporalClient production(Path workspaceRoot) { - return new TemporalClient(TemporalUtils.TEMPORAL_CLIENT, workspaceRoot); + public static TemporalClient production(String temporalHost, Path workspaceRoot) { + return new TemporalClient(TemporalUtils.createTemporalClient(temporalHost), workspaceRoot); } // todo (cgardens) - there are two sources of truth on workspace root. we need to get this down to diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java index e435b4684c58..e0d361341b70 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java @@ -30,6 +30,8 @@ import io.temporal.api.namespace.v1.NamespaceInfo; import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse; import io.temporal.api.workflowservice.v1.ListNamespacesRequest; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; import java.nio.file.Path; @@ -41,10 +43,12 @@ public class TemporalPool implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TemporalPool.class); + private final WorkflowServiceStubs temporalService; private final Path workspaceRoot; private final ProcessBuilderFactory pbf; - public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) { + public TemporalPool(WorkflowServiceStubs temporalService, Path workspaceRoot, ProcessBuilderFactory pbf) { + this.temporalService = temporalService; this.workspaceRoot = workspaceRoot; this.pbf = pbf; } @@ -53,7 +57,7 @@ public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) { public void run() { waitForTemporalServerAndLog(); - final WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT); + final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService)); final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name()); specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class); @@ -74,10 +78,10 @@ public void run() { factory.start(); } - private static void waitForTemporalServerAndLog() { + private void waitForTemporalServerAndLog() { LOGGER.info("Waiting for temporal server..."); - while (!getNamespaces().contains("default")) { + while (!getNamespaces(temporalService).contains("default")) { LOGGER.warn("Waiting for default namespace to be initialized in temporal..."); wait(2); } @@ -96,8 +100,8 @@ private static void wait(int seconds) { } } - private static Set getNamespaces() { - return TemporalUtils.TEMPORAL_SERVICE.blockingStub() + private static Set getNamespaces(WorkflowServiceStubs temporalService) { + return temporalService.blockingStub() .listNamespaces(ListNamespacesRequest.newBuilder().build()) .getNamespacesList() .stream() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 769d266d2a22..b83e8a7bdbab 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -35,14 +35,19 @@ public class TemporalUtils { - private static final WorkflowServiceStubsOptions TEMPORAL_OPTIONS = WorkflowServiceStubsOptions.newBuilder() - // todo move to env. - .setTarget("airbyte-temporal:7233") - .build(); + public static WorkflowServiceStubs createTemporalService(String temporalHost) { + final WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder() + // todo move to env. + .setTarget(temporalHost) + .build(); - public static final WorkflowServiceStubs TEMPORAL_SERVICE = WorkflowServiceStubs.newInstance(TEMPORAL_OPTIONS); + return WorkflowServiceStubs.newInstance(options); + } - public static final WorkflowClient TEMPORAL_CLIENT = WorkflowClient.newInstance(TEMPORAL_SERVICE); + public static WorkflowClient createTemporalClient(String temporalHost) { + final WorkflowServiceStubs temporalService = createTemporalService(temporalHost); + return WorkflowClient.newInstance(temporalService); + } public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build(); diff --git a/docker-compose.yaml b/docker-compose.yaml index ed78bd9572b2..fb7bc40f8936 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -45,6 +45,7 @@ services: - TRACKING_STRATEGY=${TRACKING_STRATEGY} - AIRBYTE_VERSION=${VERSION} - AIRBYTE_ROLE=${AIRBYTE_ROLE:-} + - TEMPORAL_HOST=${TEMPORAL_HOST} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT} @@ -65,6 +66,7 @@ services: - AIRBYTE_VERSION=${VERSION} - AIRBYTE_ROLE=${AIRBYTE_ROLE:-} - WORKSPACE_ROOT=${WORKSPACE_ROOT} + - TEMPORAL_HOST=${TEMPORAL_HOST} ports: - 8001:8001 volumes: