Skip to content

Commit

Permalink
Clean up Cancel Exception Handling. (#3891)
Browse files Browse the repository at this point in the history
As part of working on the Kube cancelling, I realised our exception handling for the cancel operation is noisy and confusing. It was quite scary to me as I tried to debug why Kube was not happening. Decided to clean this up as is before nailing down Kube cancel behaviour.

This PR tries to clean this up. Unnecessary exceptions have been removed. Exceptions that happen on cancel have been converted into log lines that state this exception is typically thrown on cancel.

In particular,
- Convert IOException in LineGobbler to log lines since this a race occurs between closing the pool and the process. An exception must be thrown here when the gobbler tries to read the stream after the underlying process has exited.
- Stop catching and throwing exceptions in the CancellationHandler. This exception is also thrown as this is how Temporal handles cancellation. Convert this to a log line.

See the PR for more detailed notes.
  • Loading branch information
davinchia authored Jun 5, 2021
1 parent 1ad283b commit 70a48f5
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import io.airbyte.commons.concurrency.VoidCallable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand All @@ -40,24 +41,39 @@ public class LineGobbler implements VoidCallable {
private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class);

public static void gobble(final InputStream is, final Consumer<String> consumer) {
gobble(is, consumer, "generic");
}

public static void gobble(final InputStream is, final Consumer<String> consumer, String caller) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Map<String, String> mdc = MDC.getCopyOfContextMap();
executor.submit(new LineGobbler(is, consumer, executor, mdc));
var gobbler = new LineGobbler(is, consumer, executor, mdc, caller);
executor.submit(gobbler);
}

private final BufferedReader is;
private final Consumer<String> consumer;
private final ExecutorService executor;
private final Map<String, String> mdc;
private final String caller;

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc) {
this(is, consumer, executor, mdc, "generic");
}

LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller) {
this.is = IOs.newBufferedReader(is);
this.consumer = consumer;
this.executor = executor;
this.mdc = mdc;
this.caller = caller;
}

@Override
Expand All @@ -68,8 +84,10 @@ public void voidCall() {
while ((line = is.readLine()) != null) {
consumer.accept(line);
}
} catch (IOException i) {
LOGGER.warn("{} gobbler IOException: {}. Typically happens when cancelling a job.", caller, i.getMessage());
} catch (Exception e) {
LOGGER.error("Error when reading stream", e);
LOGGER.error("{} gobbler error when reading stream", caller, e);
} finally {
executor.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -60,6 +61,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final MessageTracker<AirbyteMessage> sourceMessageTracker;
private final MessageTracker<AirbyteMessage> destinationMessageTracker;

private final ExecutorService executors;
private final AtomicBoolean cancelled;
private final AtomicBoolean hasFailed;

Expand All @@ -77,6 +79,7 @@ public DefaultReplicationWorker(final String jobId,
this.destination = destination;
this.sourceMessageTracker = sourceMessageTracker;
this.destinationMessageTracker = destinationMessageTracker;
this.executors = Executors.newFixedThreadPool(2);

this.cancelled = new AtomicBoolean(false);
this.hasFailed = new AtomicBoolean(false);
Expand Down Expand Up @@ -111,7 +114,6 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
final StandardTapConfig sourceConfig = WorkerUtils.syncToTapConfig(syncInput);

final ExecutorService executorService = Executors.newFixedThreadPool(2);
final Map<String, String> mdc = MDC.getCopyOfContextMap();

// note: resources are closed in the opposite order in which they are declared. thus source will be
Expand All @@ -120,13 +122,13 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
destination.start(destinationConfig, jobRoot);
source.start(sourceConfig, jobRoot);

final Future<?> destinationOutputThreadFuture = executorService.submit(getDestinationOutputRunnable(
final Future<?> destinationOutputThreadFuture = executors.submit(getDestinationOutputRunnable(
destination,
cancelled,
destinationMessageTracker,
mdc));

final Future<?> replicationThreadFuture = executorService.submit(getReplicationRunnable(
final Future<?> replicationThreadFuture = executors.submit(getReplicationRunnable(
source,
destination,
cancelled,
Expand All @@ -145,7 +147,7 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
hasFailed.set(true);
LOGGER.error("Sync worker failed.", e);
} finally {
executorService.shutdownNow();
executors.shutdownNow();
}

final ReplicationStatus outputStatus;
Expand Down Expand Up @@ -215,7 +217,13 @@ private static Runnable getReplicationRunnable(Source<AirbyteMessage> source,
}
destination.notifyEndOfStream();
} catch (Exception e) {
throw new RuntimeException(e);
if (!cancelled.get()) {
// Although this thread is closed first, it races with the source's closure and can attempt one
// final read after the source is closed before it's terminated.
// This read will fail and throw an exception. Because of this, throw exceptions only if the worker
// was not cancelled.
throw new RuntimeException(e);
}
}
};
}
Expand All @@ -236,29 +244,42 @@ private static Runnable getDestinationOutputRunnable(Destination<AirbyteMessage>
}
}
} catch (Exception e) {
throw new RuntimeException(e);
if (!cancelled.get()) {
// Although this thread is closed first, it races with the destination's closure and can attempt one
// final read after the destination is closed before it's terminated.
// This read will fail and throw an exception. Because of this, throw exceptions only if the worker
// was not cancelled.
throw new RuntimeException(e);
}
}
};
}

@Override
public void cancel() {
// Resources are closed in the opposite order they are declared.
LOGGER.info("Cancelling replication worker...");
cancelled.set(true);

LOGGER.info("Cancelling source...");
try {
source.cancel();
} catch (Exception e) {
executors.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
cancelled.set(true);

LOGGER.info("Cancelling destination...");
try {
destination.cancel();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info("Error cancelling destination: ", e);
}

LOGGER.info("Cancelling source...");
try {
source.cancel();
} catch (Exception e) {
LOGGER.info("Error cancelling source: ", e);
}

}

}
4 changes: 4 additions & 0 deletions airbyte-workers/src/main/java/io/airbyte/workers/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public interface Worker<InputType, OutputType> {
*/
OutputType run(InputType inputType, Path jobRoot) throws WorkerException;

/**
* Cancels in-progress workers. Although all workers support cancel, in reality only the
* asynchronous {@link DefaultReplicationWorker}'s cancel is used.
*/
void cancel();

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void start(StandardTargetConfig destinationConfig, Path jobRoot) throws I
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error);
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination");

writer = new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8));

Expand All @@ -111,7 +111,7 @@ public void notifyEndOfStream() throws IOException {
}

@Override
public void close() throws WorkerException, IOException {
public void close() throws IOException {
if (destinationProcess == null) {
return;
}
Expand All @@ -123,7 +123,9 @@ public void close() throws WorkerException, IOException {
LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
throw new WorkerException("destination process wasn't successful");
LOGGER.warn(
"Destination process might not have shut down correctly. destination process alive: {}, destination process exit value: {}. This warning is normal if the job was cancelled.",
destinationProcess.isAlive(), destinationProcess.exitValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.IntegrationLauncher;
import java.nio.file.Path;
Expand Down Expand Up @@ -90,7 +89,7 @@ public void start(StandardTapConfig input, Path jobRoot) throws Exception {
WorkerConstants.SOURCE_CATALOG_JSON_FILENAME,
input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME);
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error);
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source");

messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())
Expand Down Expand Up @@ -127,7 +126,9 @@ public void close() throws Exception {
FORCED_SHUTDOWN_DURATION);

if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
throw new WorkerException("Source process wasn't successful");
LOGGER.warn(
"Source process might not have shut down correctly. source process alive: {}, source process exit value: {}. This warning is normal if the job was cancelled.",
sourceProcess.isAlive(), sourceProcess.exitValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface CancellationHandler {

void checkAndHandleCancellation(Runnable onCancellationCallback) throws WorkerException;
void checkAndHandleCancellation(Runnable onCancellationCallback);

class TemporalCancellationHandler implements CancellationHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(TemporalCancellationHandler.class);

final ActivityExecutionContext context;

public TemporalCancellationHandler() {
Expand All @@ -54,7 +58,7 @@ public TemporalCancellationHandler() {
* @throws WorkerException
*/
@Override
public void checkAndHandleCancellation(Runnable onCancellationCallback) throws WorkerException {
public void checkAndHandleCancellation(Runnable onCancellationCallback) {
try {
// Heartbeat is somewhat misleading here. What it does is check the current Temporal activity's
// context and
Expand All @@ -64,7 +68,7 @@ public void checkAndHandleCancellation(Runnable onCancellationCallback) throws W
context.heartbeat(null);
} catch (ActivityCompletionException e) {
onCancellationCallback.run();
throw new WorkerException("Worker cleaned up after exception", e);
LOGGER.warn("Job either timeout-ed or was cancelled.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.temporal.activity.Activity;
import java.io.IOException;
Expand All @@ -42,6 +41,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -153,24 +153,46 @@ private Thread getWorkerThread(Worker<INPUT, OUTPUT> worker, CompletableFuture<O
});
}

/**
* Cancel is implementation in a slightly convoluted manner due to Temporal's semantics. Cancel
* requests are routed to the Temporal Scheduler via the cancelJob function in
* SchedulerHandler.java. This manifests as a {@link io.temporal.client.ActivityCompletionException}
* when the {@link CancellationHandler} heartbeats to the Temporal Scheduler.
*
* The callback defined in this function is executed after the above exception is caught, and
* defines the clean up operations executed as part of cancel.
*
* See {@link CancellationHandler} for more info.
*/
private Runnable getCancellationChecker(Worker<INPUT, OUTPUT> worker, Thread workerThread, CompletableFuture<OUTPUT> outputFuture) {
var cancelled = new AtomicBoolean(false);
return () -> {
try {
mdcSetter.accept(jobRoot, jobId);

final Runnable onCancellationCallback = () -> {
if (cancelled.get()) {
// Since this is a separate thread, race condition between the executor service shutting down and
// this thread's next invocation can happen. This
// check guarantees cancel operations are only executed once.
return;
}

LOGGER.info("Running sync worker cancellation...");
cancelled.set(true);
worker.cancel();

LOGGER.info("Interrupting worker thread...");
workerThread.interrupt();

LOGGER.info("Cancelling completable future...");
// This throws a CancellationException as part of the cancelling and is the exception seen in logs
// when cancelling the job.
outputFuture.cancel(false);
};

cancellationHandler.checkAndHandleCancellation(onCancellationCallback);
} catch (WorkerException e) {
} catch (Exception e) {
LOGGER.error("Cancellation checker exception", e);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,4 @@ public void testCloseNotifiesLifecycle() throws Exception {
verify(outputStream).close();
}

@Test
public void testProcessFailLifecycle() throws Exception {
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

when(process.isAlive()).thenReturn(false);
when(process.exitValue()).thenReturn(1);
Assertions.assertThrows(WorkerException.class, destination::close);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,4 @@ public void testSuccessfulLifecycle() throws Exception {
verify(process).exitValue();
}

@Test
public void testProcessFail() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
tap.start(SOURCE_CONFIG, jobRoot);

when(process.exitValue()).thenReturn(1);

Assertions.assertThrows(WorkerException.class, tap::close);
}

}

0 comments on commit 70a48f5

Please sign in to comment.