diff --git a/.env b/.env
index 4a0d08b2cfaa6..baf5e95e969e7 100644
--- a/.env
+++ b/.env
@@ -68,7 +68,6 @@ RESOURCE_MEMORY_REQUEST=
RESOURCE_MEMORY_LIMIT=
# Max attempts per sync and max retries per attempt
-MAX_RETRIES_PER_ATTEMPT=3
MAX_SYNC_JOB_ATTEMPTS=3
# Time in days to reach a timeout to cancel the synchronization
diff --git a/.env.dev b/.env.dev
index 5403fab586ea1..9753e741e8bd5 100644
--- a/.env.dev
+++ b/.env.dev
@@ -20,6 +20,5 @@ HACK_LOCAL_ROOT_PARENT=/tmp
WEBAPP_URL=http://localhost:8000/
API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
-MAX_RETRIES_PER_ATTEMPT=3
MAX_SYNC_JOB_ATTEMPTS=3
MAX_SYNC_TIMEOUT_DAYS=3
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
index b60b5ffa247b6..3042fe922c9ff 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
@@ -57,8 +57,6 @@ public interface Configs {
String getConfigDatabaseUrl();
- int getMaxRetriesPerAttempt();
-
int getMaxSyncJobAttempts();
int getMaxSyncTimeoutDays();
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
index c308675347ae6..bdcf445c8efac 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
@@ -59,7 +59,6 @@ public class EnvConfigs implements Configs {
public static final String CONFIG_DATABASE_PASSWORD = "CONFIG_DATABASE_PASSWORD";
public static final String CONFIG_DATABASE_URL = "CONFIG_DATABASE_URL";
public static final String WEBAPP_URL = "WEBAPP_URL";
- public static final String MAX_RETRIES_PER_ATTEMPT = "MAX_RETRIES_PER_ATTEMPT";
public static final String MAX_SYNC_JOB_ATTEMPTS = "MAX_SYNC_JOB_ATTEMPTS";
public static final String MAX_SYNC_TIMEOUT_DAYS = "MAX_SYNC_TIMEOUT_DAYS";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
@@ -147,11 +146,6 @@ public String getDatabaseUrl() {
return getEnsureEnv(DATABASE_URL);
}
- @Override
- public int getMaxRetriesPerAttempt() {
- return Integer.parseInt(getEnvOrDefault(MAX_RETRIES_PER_ATTEMPT, "3"));
- }
-
@Override
public int getMaxSyncJobAttempts() {
return Integer.parseInt(getEnvOrDefault(MAX_SYNC_JOB_ATTEMPTS, "3"));
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/RetryingTemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/RetryingTemporalAttemptExecution.java
deleted file mode 100644
index 59bf4f4efcd1c..0000000000000
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/RetryingTemporalAttemptExecution.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 Airbyte
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package io.airbyte.workers.temporal;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.airbyte.commons.functional.CheckedSupplier;
-import io.airbyte.scheduler.models.JobRunConfig;
-import io.airbyte.workers.Worker;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * This class allows a worker to run multiple times. In addition to the functionality in {@link
- * TemporalAttemptExecution} it takes a predicate to determine if the output of a worker constitutes
- * a complete success or a partial one. It also takes a function that takes in the input of the
- * previous run of the worker and the output of the last worker in order to generate a new input for
- * that worker.
- */
-public class RetryingTemporalAttemptExecution implements Supplier> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(RetryingTemporalAttemptExecution.class);
-
- private final CheckedSupplier, Exception> workerSupplier;
- private final Supplier inputSupplier;
- private final CancellationHandler cancellationHandler;
-
- private final Predicate shouldAttemptAgainPredicate;
- private final BiFunction computeNextAttemptInputFunction;
- private final int maxRetriesCount;
- private final TemporalAttemptExecutionFactory temporalAttemptExecutionFactory;
- private final Path workspaceRoot;
- private final JobRunConfig jobRunConfig;
-
- public RetryingTemporalAttemptExecution(Path workspaceRoot,
- JobRunConfig jobRunConfig,
- CheckedSupplier, Exception> workerSupplier,
- Supplier initialInputSupplier,
- CancellationHandler cancellationHandler,
- Predicate shouldAttemptAgainPredicate,
- BiFunction computeNextAttemptInputFunction,
- int maxRetriesCount) {
- this(
- workspaceRoot,
- jobRunConfig,
- workerSupplier,
- initialInputSupplier,
- cancellationHandler,
- shouldAttemptAgainPredicate,
- computeNextAttemptInputFunction,
- maxRetriesCount,
- TemporalAttemptExecution::new);
- }
-
- @VisibleForTesting
- RetryingTemporalAttemptExecution(Path workspaceRoot,
- JobRunConfig jobRunConfig,
- CheckedSupplier, Exception> workerSupplier,
- Supplier initialInputSupplier,
- CancellationHandler cancellationHandler,
- Predicate shouldAttemptAgainPredicate,
- BiFunction computeNextAttemptInputFunction,
- int maxRetriesCount,
- TemporalAttemptExecutionFactory temporalAttemptExecutionFactory) {
- this.workspaceRoot = workspaceRoot;
- this.jobRunConfig = jobRunConfig;
- this.workerSupplier = workerSupplier;
- this.inputSupplier = initialInputSupplier;
- this.cancellationHandler = cancellationHandler;
- this.shouldAttemptAgainPredicate = shouldAttemptAgainPredicate;
- this.computeNextAttemptInputFunction = computeNextAttemptInputFunction;
- this.maxRetriesCount = maxRetriesCount;
- this.temporalAttemptExecutionFactory = temporalAttemptExecutionFactory;
- }
-
- @Override
- public List get() {
- INPUT input = inputSupplier.get();
- final AtomicReference lastOutput = new AtomicReference<>();
- List outputCollector = new ArrayList<>();
-
- for (int i = 0; true; i++) {
- if (i >= maxRetriesCount) {
- LOGGER.info("Max retries reached: {}", i);
- break;
- }
-
- final boolean hasLastOutput = lastOutput.get() != null;
- final boolean shouldAttemptAgain = !hasLastOutput || shouldAttemptAgainPredicate.test(lastOutput.get());
- LOGGER.info("Last output present: {}. Should attempt again: {}", lastOutput.get() != null, shouldAttemptAgain);
- if (hasLastOutput && !shouldAttemptAgain) {
- break;
- }
-
- LOGGER.info("Starting attempt: {} of {}", i, maxRetriesCount);
-
- Supplier resolvedInputSupplier = !hasLastOutput ? inputSupplier : () -> computeNextAttemptInputFunction.apply(input, lastOutput.get());
-
- final TemporalAttemptExecution temporalAttemptExecution = temporalAttemptExecutionFactory.create(
- workspaceRoot,
- jobRunConfig,
- workerSupplier,
- resolvedInputSupplier,
- cancellationHandler);
- lastOutput.set(temporalAttemptExecution.get());
- outputCollector.add(lastOutput.get());
- }
-
- return outputCollector;
- }
-
- // interface to make testing easier.
- @FunctionalInterface
- interface TemporalAttemptExecutionFactory {
-
- TemporalAttemptExecution create(Path workspaceRoot,
- JobRunConfig jobRunConfig,
- CheckedSupplier, Exception> workerSupplier,
- Supplier inputSupplier,
- CancellationHandler cancellationHandler);
-
- }
-
-}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java
index c0be7e608b837..a2456decbfb61 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java
@@ -27,13 +27,10 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
-import io.airbyte.commons.util.MoreLists;
import io.airbyte.config.AirbyteConfigValidator;
import io.airbyte.config.ConfigSchema;
-import io.airbyte.config.EnvConfigs;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.OperatorDbtInput;
-import io.airbyte.config.ReplicationAttemptSummary;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSyncInput;
@@ -41,7 +38,6 @@
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
-import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DbtTransformationRunner;
@@ -69,9 +65,6 @@
import io.temporal.workflow.WorkflowMethod;
import java.nio.file.Path;
import java.time.Duration;
-import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,8 +144,6 @@ class ReplicationActivityImpl implements ReplicationActivity {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationActivityImpl.class);
- private static final int MAX_RETRIES = new EnvConfigs().getMaxRetriesPerAttempt();
-
private final ProcessFactory processFactory;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
@@ -179,60 +170,35 @@ public StandardSyncOutput replicate(JobRunConfig jobRunConfig,
return syncInput;
};
- final Predicate shouldAttemptAgain =
- output -> output.getReplicationAttemptSummary().getStatus() != ReplicationStatus.COMPLETED;
+ final TemporalAttemptExecution temporalAttempt = new TemporalAttemptExecution<>(
+ workspaceRoot,
+ jobRunConfig,
+ getWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput),
+ inputSupplier,
+ new CancellationHandler.TemporalCancellationHandler());
- final BiFunction nextAttemptInput = (input, lastOutput) -> {
- final StandardSyncInput newInput = Jsons.clone(input);
- newInput.setState(lastOutput.getState());
- return newInput;
- };
+ final ReplicationOutput attemptOutput = temporalAttempt.get();
+ final StandardSyncOutput standardSyncOutput = reduceReplicationOutput(attemptOutput);
- final RetryingTemporalAttemptExecution temporalAttemptExecution =
- new RetryingTemporalAttemptExecution<>(
- workspaceRoot,
- jobRunConfig,
- getWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput),
- inputSupplier,
- new CancellationHandler.TemporalCancellationHandler(),
- shouldAttemptAgain,
- nextAttemptInput,
- MAX_RETRIES);
-
- final List attemptOutputs = temporalAttemptExecution.get();
- final StandardSyncOutput standardSyncOutput = reduceReplicationOutputs(attemptOutputs);
-
- LOGGER.info("attempt summaries: {}", attemptOutputs);
LOGGER.info("sync summary: {}", standardSyncOutput);
return standardSyncOutput;
}
- // todo (cgardens) - this operation is lossy (we lose the ability to see the amount of data
- // replicated by each attempt). likely in the future, we will want to retain this info and surface
- // it.
- /**
- * aggregate each attempts output into a sync summary.
- */
- private static StandardSyncOutput reduceReplicationOutputs(List attemptOutputs) {
- final long totalBytesReplicated = attemptOutputs
- .stream()
- .map(ReplicationOutput::getReplicationAttemptSummary)
- .mapToLong(ReplicationAttemptSummary::getBytesSynced).sum();
- final long totalRecordsReplicated = attemptOutputs
- .stream()
- .map(ReplicationOutput::getReplicationAttemptSummary)
- .mapToLong(ReplicationAttemptSummary::getRecordsSynced).sum();
+ private static StandardSyncOutput reduceReplicationOutput(ReplicationOutput output) {
+ final long totalBytesReplicated = output.getReplicationAttemptSummary().getBytesSynced();
+ final long totalRecordsReplicated = output.getReplicationAttemptSummary().getRecordsSynced();
+
final StandardSyncSummary syncSummary = new StandardSyncSummary();
syncSummary.setBytesSynced(totalBytesReplicated);
syncSummary.setRecordsSynced(totalRecordsReplicated);
- syncSummary.setStartTime(attemptOutputs.get(0).getReplicationAttemptSummary().getStartTime());
- syncSummary.setEndTime(MoreLists.last(attemptOutputs).orElseThrow().getReplicationAttemptSummary().getEndTime());
- syncSummary.setStatus(MoreLists.last(attemptOutputs).orElseThrow().getReplicationAttemptSummary().getStatus());
+ syncSummary.setStartTime(output.getReplicationAttemptSummary().getStartTime());
+ syncSummary.setEndTime(output.getReplicationAttemptSummary().getEndTime());
+ syncSummary.setStatus(output.getReplicationAttemptSummary().getStatus());
final StandardSyncOutput standardSyncOutput = new StandardSyncOutput();
- standardSyncOutput.setState(MoreLists.last(attemptOutputs).orElseThrow().getState());
- standardSyncOutput.setOutputCatalog(MoreLists.last(attemptOutputs).orElseThrow().getOutputCatalog());
+ standardSyncOutput.setState(output.getState());
+ standardSyncOutput.setOutputCatalog(output.getOutputCatalog());
standardSyncOutput.setStandardSyncSummary(syncSummary);
return standardSyncOutput;
diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/RetryingTemporalAttemptExecutionTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/RetryingTemporalAttemptExecutionTest.java
deleted file mode 100644
index 7b031edcc421d..0000000000000
--- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/RetryingTemporalAttemptExecutionTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 Airbyte
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package io.airbyte.workers.temporal;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import io.airbyte.commons.functional.CheckedConsumer;
-import io.airbyte.commons.functional.CheckedSupplier;
-import io.airbyte.scheduler.models.JobRunConfig;
-import io.airbyte.workers.Worker;
-import io.temporal.internal.common.CheckedExceptionWrapper;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.stubbing.Answer;
-
-class RetryingTemporalAttemptExecutionTest {
-
- private static final int MAX_RETRIES = 3;
- private static final String INPUT = "the king";
- private static final String JOB_ID = "11";
- private static final int ATTEMPT_ID = 21;
- private static final JobRunConfig JOB_RUN_CONFIG = new JobRunConfig().withJobId(JOB_ID).withAttemptId((long) ATTEMPT_ID);
-
- private Path jobRoot;
-
- private CheckedSupplier, Exception> execution;
- private Supplier inputSupplier;
- private Consumer mdcSetter;
- private Predicate shouldAttemptAgain;
- private BiFunction nextInput;
-
- private RetryingTemporalAttemptExecution attemptExecution;
-
- @SuppressWarnings("unchecked")
- @BeforeEach
- void setup() throws IOException {
- final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_attempt_execution_test");
- jobRoot = workspaceRoot.resolve(JOB_ID).resolve(String.valueOf(ATTEMPT_ID));
-
- execution = mock(CheckedSupplier.class);
- inputSupplier = mock(Supplier.class);
- mdcSetter = mock(Consumer.class);
- shouldAttemptAgain = mock(Predicate.class);
- nextInput = mock(BiFunction.class);
- final CheckedConsumer jobRootDirCreator = Files::createDirectories;
- final Supplier workflowIdSupplier = () -> "workflow_id";
- final CancellationHandler cancellationHandler = mock(CancellationHandler.class);
- attemptExecution = new RetryingTemporalAttemptExecution<>(
- workspaceRoot,
- JOB_RUN_CONFIG,
- execution,
- inputSupplier,
- cancellationHandler,
- shouldAttemptAgain,
- nextInput,
- MAX_RETRIES,
- (workspaceRootArg, jobRunConfigArg, workerSupplierArg, initialInputSupplierArg, cancellationHandlerArg) -> new TemporalAttemptExecution<>(
- workspaceRootArg,
- jobRunConfigArg,
- execution,
- initialInputSupplierArg,
- mdcSetter,
- jobRootDirCreator,
- cancellationHandlerArg,
- workflowIdSupplier));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- void testSuccessfulSupplierRun() throws Exception {
- final String expected = "louis XVI";
- final Worker worker = mock(Worker.class);
- when(worker.run(eq(INPUT), any())).thenReturn(expected);
- when(shouldAttemptAgain.test(expected)).thenReturn(false);
- when(inputSupplier.get()).thenReturn(INPUT);
- when(execution.get()).thenAnswer((Answer>) invocation -> worker);
-
- final List actual = attemptExecution.get();
-
- assertEquals(List.of(expected), actual);
-
- verify(execution).get();
- verify(worker).run(eq(INPUT), any());
- verify(mdcSetter, atLeast(2)).accept(jobRoot);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- void testSuccessfulSupplierRunMultipleAttempts() throws Exception {
- final List expected = List.of("louis XVI", "louis XVII");
- final Worker worker = mock(Worker.class);
- when(worker.run(eq(INPUT), any())).thenReturn(expected.get(0));
- when(worker.run(eq(INPUT + "I"), any())).thenReturn(expected.get(1));
- when(shouldAttemptAgain.test(expected.get(0))).thenReturn(true);
- when(shouldAttemptAgain.test(expected.get(1))).thenReturn(false);
- when(inputSupplier.get()).thenReturn(INPUT);
- when(nextInput.apply(any(), any())).thenAnswer(a -> a.getArguments()[0] + "I");
-
- when(execution.get()).thenAnswer((Answer>) invocation -> worker);
-
- final List actual = attemptExecution.get();
-
- assertEquals(expected, actual);
-
- verify(execution, times(2)).get();
- verify(worker).run(eq(INPUT), any());
- verify(worker).run(eq(INPUT + "I"), any());
- verify(mdcSetter, atLeast(2)).accept(jobRoot);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- void testExceedsMaxAttempts() throws Exception {
- final List expected = List.of("louis XVI", "louis XVII", "louis XVIII");
- final Worker worker = mock(Worker.class);
- when(worker.run(eq(INPUT), any())).thenReturn(expected.get(0));
- when(worker.run(eq(INPUT + "I"), any())).thenReturn(expected.get(1));
- when(worker.run(eq(INPUT + "II"), any())).thenReturn(expected.get(2));
- when(shouldAttemptAgain.test(expected.get(0))).thenReturn(true);
- when(shouldAttemptAgain.test(expected.get(1))).thenReturn(true);
- when(shouldAttemptAgain.test(expected.get(2))).thenReturn(true);
- when(inputSupplier.get()).thenReturn(INPUT);
- when(nextInput.apply(any(), any())).thenAnswer(a -> a.getArguments()[0] + "I").thenAnswer(a -> a.getArguments()[0] + "II");
-
- when(execution.get()).thenAnswer((Answer>) invocation -> worker);
-
- final List actual = attemptExecution.get();
-
- assertEquals(expected, actual);
-
- verify(execution, times(3)).get();
- verify(worker).run(eq(INPUT), any());
- verify(worker).run(eq(INPUT + "I"), any());
- verify(worker).run(eq(INPUT + "II"), any());
- verify(mdcSetter, atLeast(2)).accept(jobRoot);
- }
-
- @Test
- void testThrowsCheckedException() throws Exception {
- when(execution.get()).thenThrow(new IOException());
-
- final CheckedExceptionWrapper actualException = assertThrows(CheckedExceptionWrapper.class, () -> attemptExecution.get());
- assertEquals(IOException.class, CheckedExceptionWrapper.unwrap(actualException).getClass());
-
- verify(execution).get();
- verify(mdcSetter).accept(jobRoot);
- }
-
- @Test
- void testThrowsUncheckedException() throws Exception {
- when(execution.get()).thenThrow(new IllegalArgumentException());
-
- assertThrows(IllegalArgumentException.class, () -> attemptExecution.get());
-
- verify(execution).get();
- verify(mdcSetter).accept(jobRoot);
- }
-
-}
diff --git a/docker-compose.yaml b/docker-compose.yaml
index f6ae494661a7f..b20598fa93c1a 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -62,7 +62,6 @@ services:
- RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT}
- RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST}
- RESOURCE_MEMORY_LIMIT=${RESOURCE_MEMORY_LIMIT}
- - MAX_RETRIES_PER_ATTEMPT=${MAX_RETRIES_PER_ATTEMPT}
- MAX_SYNC_JOB_ATTEMPTS=${MAX_SYNC_JOB_ATTEMPTS}
- MAX_SYNC_TIMEOUT_DAYS=${MAX_SYNC_TIMEOUT_DAYS}
volumes: