Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use spec when persisting source configs #6036

Merged
merged 7 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
only wait on the server in the scheduler, not the worker
  • Loading branch information
jrhizor committed Sep 14, 2021
commit 0bb9d7d0b12beda5f3f70f1a9126caabc11619f1
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.temporal.TemporalClient;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -153,6 +152,25 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi
}
}

public static void waitForServer(Configs configs) throws InterruptedException {
final AirbyteApiClient apiClient = new AirbyteApiClient(
new io.airbyte.api.client.invoker.ApiClient().setScheme("http")
.setHost(configs.getAirbyteApiHost())
.setPort(configs.getAirbyteApiPort())
.setBasePath("/api"));

boolean isHealthy = false;
while (!isHealthy) {
try {
HealthCheckRead healthCheck = apiClient.getHealthApi().getHealthCheck();
isHealthy = healthCheck.getDb();
} catch (ApiException e) {
LOGGER.info("Waiting for server to become available...");
Thread.sleep(2000);
}
}
}

public static void main(String[] args) throws IOException, InterruptedException {

final Configs configs = new EnvConfigs();
Expand All @@ -166,7 +184,7 @@ public static void main(String[] args) throws IOException, InterruptedException
LOGGER.info("temporalHost = " + temporalHost);

// Wait for the server to initialize the database and run migration
WorkerApp.waitForServer(configs);
waitForServer(configs);

LOGGER.info("Creating Job DB connection pool...");
final Database jobDatabase = new JobsDatabaseInstance(
Expand Down
24 changes: 0 additions & 24 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@

package io.airbyte.workers;

import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.ApiException;
import io.airbyte.api.client.model.HealthCheckRead;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.MaxWorkersConfig;
Expand Down Expand Up @@ -133,25 +130,6 @@ private static ProcessFactory getProcessBuilderFactory(Configs configs) throws I
}
}

public static void waitForServer(Configs configs) throws InterruptedException {
final AirbyteApiClient apiClient = new AirbyteApiClient(
new io.airbyte.api.client.invoker.ApiClient().setScheme("http")
.setHost(configs.getAirbyteApiHost())
.setPort(configs.getAirbyteApiPort())
.setBasePath("/api"));

boolean isHealthy = false;
while (!isHealthy) {
try {
HealthCheckRead healthCheck = apiClient.getHealthApi().getHealthCheck();
isHealthy = healthCheck.getDb();
} catch (ApiException e) {
LOGGER.info("Waiting for server to become available...");
Thread.sleep(2000);
}
}
}

private static final WorkerOptions getWorkerOptions(int max) {
return WorkerOptions.newBuilder()
.setMaxConcurrentActivityExecutionSize(max)
Expand All @@ -161,8 +139,6 @@ private static final WorkerOptions getWorkerOptions(int max) {
public static void main(String[] args) throws IOException, InterruptedException {
final Configs configs = new EnvConfigs();

waitForServer(configs);

LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getSchedulerLogsRoot(configs));

final Path workspaceRoot = configs.getWorkspaceRoot();
Expand Down