diff --git a/.env b/.env
index af3b6e57d9e80..a5c8c0bb25731 100644
--- a/.env
+++ b/.env
@@ -17,3 +17,4 @@ TRACKING_STRATEGY=segment
# already exist on the host filesystem and MUST be parents of *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp
+TEMPORAL_HOST=airbyte-temporal:7233
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
index eccb32c147f44..7837c838ab621 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
@@ -58,6 +58,8 @@ public interface Configs {
WorkspaceRetentionConfig getWorkspaceRetentionConfig();
+ String getTemporalHost();
+
enum TrackingStrategy {
SEGMENT,
LOGGING
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
index f07a03e66c354..219d454a227f3 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
@@ -51,6 +51,7 @@ public class EnvConfigs implements Configs {
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
+ private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
@@ -115,62 +116,34 @@ public String getDatabaseUrl() {
@Override
public String getWorkspaceDockerMount() {
- final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT);
- if (mount != null) {
- return mount;
- }
-
- LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT);
- return getWorkspaceRoot().toString();
+ return getEnvOrDefault(WORKSPACE_DOCKER_MOUNT, WORKSPACE_ROOT);
}
@Override
public String getLocalDockerMount() {
- final String mount = getEnv.apply(LOCAL_DOCKER_MOUNT);
- if (mount != null) {
- return mount;
- }
-
- LOGGER.info(LOCAL_DOCKER_MOUNT + " not found, defaulting to " + LOCAL_ROOT);
- return getLocalRoot().toString();
+ return getEnvOrDefault(LOCAL_DOCKER_MOUNT, LOCAL_ROOT);
}
@Override
public String getDockerNetwork() {
- final String network = getEnv.apply(DOCKER_NETWORK);
- if (network != null) {
- return network;
- }
-
- LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK);
- return DEFAULT_NETWORK;
+ return getEnvOrDefault(DOCKER_NETWORK, DEFAULT_NETWORK);
}
@Override
public TrackingStrategy getTrackingStrategy() {
- final String trackingStrategy = getEnv.apply(TRACKING_STRATEGY);
- if (trackingStrategy == null) {
- LOGGER.info("TRACKING_STRATEGY not set, defaulting to " + TrackingStrategy.LOGGING);
- return TrackingStrategy.LOGGING;
- }
-
- try {
- return TrackingStrategy.valueOf(trackingStrategy.toUpperCase());
- } catch (IllegalArgumentException e) {
- LOGGER.info(trackingStrategy + " not recognized, defaulting to " + TrackingStrategy.LOGGING);
- return TrackingStrategy.LOGGING;
- }
+ return getEnvOrDefault(TRACKING_STRATEGY, TrackingStrategy.LOGGING, s -> {
+ try {
+ return TrackingStrategy.valueOf(s.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(s + " not recognized, defaulting to " + TrackingStrategy.LOGGING);
+ return TrackingStrategy.LOGGING;
+ }
+ });
}
@Override
public WorkerEnvironment getWorkerEnvironment() {
- final String workerEnvironment = getEnv.apply(WORKER_ENVIRONMENT);
- if (workerEnvironment != null) {
- return WorkerEnvironment.valueOf(workerEnvironment.toUpperCase());
- }
-
- LOGGER.info(WORKER_ENVIRONMENT + " not found, defaulting to " + WorkerEnvironment.DOCKER);
- return WorkerEnvironment.DOCKER;
+ return getEnvOrDefault(WORKER_ENVIRONMENT, WorkerEnvironment.DOCKER, s -> WorkerEnvironment.valueOf(s.toUpperCase()));
}
@Override
@@ -182,10 +155,23 @@ public WorkspaceRetentionConfig getWorkspaceRetentionConfig() {
return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb);
}
- public long getEnvOrDefault(String key, long defaultValue) {
+ @Override
+ public String getTemporalHost() {
+ return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
+ }
+
+ private String getEnvOrDefault(String key, String defaultValue) {
+ return getEnvOrDefault(key, defaultValue, Function.identity());
+ }
+
+ private long getEnvOrDefault(String key, long defaultValue) {
+ return getEnvOrDefault(key, defaultValue, Long::parseLong);
+ }
+
+ private T getEnvOrDefault(String key, T defaultValue, Function parser) {
final String value = getEnv.apply(key);
if (value != null) {
- return Long.parseLong(value);
+ return parser.apply(value);
} else {
LOGGER.info(key + " not found, defaulting to " + defaultValue);
return defaultValue;
diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java
index ebb9b50fbd923..16c3d1907db5f 100644
--- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java
+++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java
@@ -47,6 +47,8 @@
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalPool;
+import io.airbyte.workers.temporal.TemporalUtils;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@@ -83,26 +85,33 @@ public class SchedulerApp {
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final JobCleaner jobCleaner;
+ private final TemporalClient temporalClient;
+ private final WorkflowServiceStubs temporalService;
public SchedulerApp(Path workspaceRoot,
ProcessBuilderFactory pbf,
JobPersistence jobPersistence,
ConfigRepository configRepository,
- JobCleaner jobCleaner) {
+ JobCleaner jobCleaner,
+ TemporalClient temporalClient,
+ WorkflowServiceStubs temporalService) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.jobCleaner = jobCleaner;
+ this.temporalClient = temporalClient;
+ this.temporalService = temporalService;
+
}
public void start() throws IOException {
- final TemporalPool temporalPool = new TemporalPool(workspaceRoot, pbf);
+ final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, pbf);
temporalPool.run();
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
- final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(TemporalClient.production(workspaceRoot), workspaceRoot);
+ final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
final JobSubmitter jobSubmitter = new JobSubmitter(
@@ -170,6 +179,9 @@ public static void main(String[] args) throws IOException, InterruptedException
final Path workspaceRoot = configs.getWorkspaceRoot();
LOGGER.info("workspaceRoot = " + workspaceRoot);
+ final String temporalHost = configs.getTemporalHost();
+ LOGGER.info("temporalHost = " + temporalHost);
+
LOGGER.info("Creating DB connection pool...");
final Database database = Databases.createPostgresDatabase(
configs.getDatabaseUser(),
@@ -206,8 +218,11 @@ public static void main(String[] args) throws IOException, InterruptedException
throw new IllegalStateException("Unable to retrieve Airbyte Version, aborting...");
}
+ final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
+ final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);
+
LOGGER.info("Launching scheduler...");
- new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner).start();
+ new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner, temporalClient, temporalService).start();
}
}
diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java
index fdd265dbdc43e..3cd02b8dcdcba 100644
--- a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java
+++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java
@@ -31,12 +31,14 @@
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.Map;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;
public class ConfigurationApiFactory implements Factory {
+ private static WorkflowServiceStubs temporalService;
private static ConfigRepository configRepository;
private static JobPersistence jobPersistence;
private static SchedulerJobClient schedulerJobClient;
@@ -73,6 +75,10 @@ public static void setMdc(Map mdc) {
ConfigurationApiFactory.mdc = mdc;
}
+ public static void setTemporalService(final WorkflowServiceStubs temporalService) {
+ ConfigurationApiFactory.temporalService = temporalService;
+ }
+
@Override
public ConfigurationApi provide() {
MDC.setContextMap(mdc);
@@ -83,7 +89,8 @@ public ConfigurationApi provide() {
ConfigurationApiFactory.schedulerJobClient,
ConfigurationApiFactory.synchronousSchedulerClient,
ConfigurationApiFactory.configs,
- ConfigurationApiFactory.archiveTtlManager);
+ ConfigurationApiFactory.archiveTtlManager,
+ ConfigurationApiFactory.temporalService);
}
@Override
diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
index 18c3c19ff19f1..a77814f2822ab 100644
--- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
+++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
@@ -54,6 +54,8 @@
import io.airbyte.server.errors.UncaughtExceptionMapper;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient;
+import io.airbyte.workers.temporal.TemporalUtils;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
@@ -99,10 +101,12 @@ public void start() throws Exception {
ConfigurationApiFactory.setSchedulerJobClient(new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence)));
final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence);
- final TemporalClient temporalClient = TemporalClient.production(configs.getWorkspaceRoot());
+ final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost());
+ final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot());
ConfigurationApiFactory
.setSynchronousSchedulerClient(new SpecCachingSynchronousSchedulerClient(new DefaultSynchronousSchedulerClient(temporalClient, jobTracker)));
+ ConfigurationApiFactory.setTemporalService(temporalService);
ConfigurationApiFactory.setConfigRepository(configRepository);
ConfigurationApiFactory.setJobPersistence(jobPersistence);
ConfigurationApiFactory.setConfigs(configs);
diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java
index 357dcbb38b212..ac3d46dc5b39a 100644
--- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java
+++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java
@@ -100,6 +100,7 @@
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
import javax.validation.Valid;
@@ -124,17 +125,25 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final LogsHandler logsHandler;
private final OpenApiConfigHandler openApiConfigHandler;
private final Configs configs;
+ private final WorkflowServiceStubs temporalService;
public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SchedulerJobClient schedulerJobClient,
final CachingSynchronousSchedulerClient synchronousSchedulerClient,
final Configs configs,
- final FileTtlManager archiveTtlManager) {
+ final FileTtlManager archiveTtlManager,
+ final WorkflowServiceStubs temporalService) {
+ this.temporalService = temporalService;
final SpecFetcher specFetcher = new SpecFetcher(synchronousSchedulerClient);
final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
- schedulerHandler =
- new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, jobPersistence, configs.getWorkspaceRoot());
+ schedulerHandler = new SchedulerHandler(
+ configRepository,
+ schedulerJobClient,
+ synchronousSchedulerClient,
+ jobPersistence,
+ configs.getWorkspaceRoot(),
+ temporalService);
final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient);
connectionsHandler = new ConnectionsHandler(configRepository);
diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
index 742a2a00ead7b..8474ca1d0d2a3 100644
--- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
+++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
@@ -71,6 +71,7 @@
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.util.UUID;
@@ -85,12 +86,14 @@ public class SchedulerHandler {
private final JsonSchemaValidator jsonSchemaValidator;
private final JobPersistence jobPersistence;
private final Path workspaceRoot;
+ private final WorkflowServiceStubs temporalService;
public SchedulerHandler(ConfigRepository configRepository,
SchedulerJobClient schedulerJobClient,
SynchronousSchedulerClient synchronousSchedulerClient,
JobPersistence jobPersistence,
- Path workspaceRoot) {
+ Path workspaceRoot,
+ WorkflowServiceStubs temporalService) {
this(
configRepository,
schedulerJobClient,
@@ -99,7 +102,8 @@ public SchedulerHandler(ConfigRepository configRepository,
new JsonSchemaValidator(),
new SpecFetcher(synchronousSchedulerClient),
jobPersistence,
- workspaceRoot);
+ workspaceRoot,
+ temporalService);
}
@VisibleForTesting
@@ -110,7 +114,8 @@ public SchedulerHandler(ConfigRepository configRepository,
JsonSchemaValidator jsonSchemaValidator,
SpecFetcher specFetcher,
JobPersistence jobPersistence,
- Path workspaceRoot) {
+ Path workspaceRoot,
+ WorkflowServiceStubs temporalService) {
this.configRepository = configRepository;
this.schedulerJobClient = schedulerJobClient;
this.synchronousSchedulerClient = synchronousSchedulerClient;
@@ -119,6 +124,7 @@ public SchedulerHandler(ConfigRepository configRepository,
this.specFetcher = specFetcher;
this.jobPersistence = jobPersistence;
this.workspaceRoot = workspaceRoot;
+ this.temporalService = temporalService;
}
public CheckConnectionRead checkSourceConnectionFromSourceId(SourceIdRequestBody sourceIdRequestBody)
@@ -316,7 +322,7 @@ public JobInfoRead cancelJob(JobIdRequestBody jobIdRequestBody) throws IOExcepti
.setNamespace(TemporalUtils.DEFAULT_NAMESPACE)
.build();
- TemporalUtils.TEMPORAL_SERVICE.blockingStub().requestCancelWorkflowExecution(cancelRequest);
+ temporalService.blockingStub().requestCancelWorkflowExecution(cancelRequest);
return JobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}
diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java
index 9938beac30d65..c26e467cd398e 100644
--- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java
+++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java
@@ -80,6 +80,7 @@
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
@@ -148,8 +149,16 @@ void setup() {
configRepository = mock(ConfigRepository.class);
jobPersistence = mock(JobPersistence.class);
- schedulerHandler = new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, configurationUpdate,
- jsonSchemaValidator, specFetcher, jobPersistence, mock(Path.class));
+ schedulerHandler = new SchedulerHandler(
+ configRepository,
+ schedulerJobClient,
+ synchronousSchedulerClient,
+ configurationUpdate,
+ jsonSchemaValidator,
+ specFetcher,
+ jobPersistence,
+ mock(Path.class),
+ mock(WorkflowServiceStubs.class));
}
@Test
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java
index 00b937f5699b9..f7f26d106ffa0 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java
@@ -49,8 +49,8 @@ public class TemporalClient {
private final Path workspaceRoot;
private final WorkflowClient client;
- public static TemporalClient production(Path workspaceRoot) {
- return new TemporalClient(TemporalUtils.TEMPORAL_CLIENT, workspaceRoot);
+ public static TemporalClient production(String temporalHost, Path workspaceRoot) {
+ return new TemporalClient(TemporalUtils.createTemporalClient(temporalHost), workspaceRoot);
}
// todo (cgardens) - there are two sources of truth on workspace root. we need to get this down to
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java
index e435b4684c58f..e0d361341b707 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java
@@ -30,6 +30,8 @@
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.nio.file.Path;
@@ -41,10 +43,12 @@ public class TemporalPool implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(TemporalPool.class);
+ private final WorkflowServiceStubs temporalService;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
- public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) {
+ public TemporalPool(WorkflowServiceStubs temporalService, Path workspaceRoot, ProcessBuilderFactory pbf) {
+ this.temporalService = temporalService;
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}
@@ -53,7 +57,7 @@ public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) {
public void run() {
waitForTemporalServerAndLog();
- final WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT);
+ final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService));
final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name());
specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class);
@@ -74,10 +78,10 @@ public void run() {
factory.start();
}
- private static void waitForTemporalServerAndLog() {
+ private void waitForTemporalServerAndLog() {
LOGGER.info("Waiting for temporal server...");
- while (!getNamespaces().contains("default")) {
+ while (!getNamespaces(temporalService).contains("default")) {
LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
wait(2);
}
@@ -96,8 +100,8 @@ private static void wait(int seconds) {
}
}
- private static Set getNamespaces() {
- return TemporalUtils.TEMPORAL_SERVICE.blockingStub()
+ private static Set getNamespaces(WorkflowServiceStubs temporalService) {
+ return temporalService.blockingStub()
.listNamespaces(ListNamespacesRequest.newBuilder().build())
.getNamespacesList()
.stream()
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java
index 769d266d2a228..b83e8a7bdbab0 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java
@@ -35,14 +35,19 @@
public class TemporalUtils {
- private static final WorkflowServiceStubsOptions TEMPORAL_OPTIONS = WorkflowServiceStubsOptions.newBuilder()
- // todo move to env.
- .setTarget("airbyte-temporal:7233")
- .build();
+ public static WorkflowServiceStubs createTemporalService(String temporalHost) {
+ final WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
+ // todo move to env.
+ .setTarget(temporalHost)
+ .build();
- public static final WorkflowServiceStubs TEMPORAL_SERVICE = WorkflowServiceStubs.newInstance(TEMPORAL_OPTIONS);
+ return WorkflowServiceStubs.newInstance(options);
+ }
- public static final WorkflowClient TEMPORAL_CLIENT = WorkflowClient.newInstance(TEMPORAL_SERVICE);
+ public static WorkflowClient createTemporalClient(String temporalHost) {
+ final WorkflowServiceStubs temporalService = createTemporalService(temporalHost);
+ return WorkflowClient.newInstance(temporalService);
+ }
public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build();
diff --git a/docker-compose.yaml b/docker-compose.yaml
index ed78bd9572b2f..fb7bc40f8936f 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -45,6 +45,7 @@ services:
- TRACKING_STRATEGY=${TRACKING_STRATEGY}
- AIRBYTE_VERSION=${VERSION}
- AIRBYTE_ROLE=${AIRBYTE_ROLE:-}
+ - TEMPORAL_HOST=${TEMPORAL_HOST}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:${WORKSPACE_ROOT}
@@ -65,6 +66,7 @@ services:
- AIRBYTE_VERSION=${VERSION}
- AIRBYTE_ROLE=${AIRBYTE_ROLE:-}
- WORKSPACE_ROOT=${WORKSPACE_ROOT}
+ - TEMPORAL_HOST=${TEMPORAL_HOST}
ports:
- 8001:8001
volumes: