diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java index 60e979f01c46..5a2403a6d981 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java @@ -4,11 +4,17 @@ package io.airbyte.integrations.base; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import java.util.Optional; import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; public interface Destination extends Integration { @@ -29,8 +35,130 @@ AirbyteMessageConsumer getConsumer(JsonNode config, Consumer outputRecordCollector) throws Exception; + /** + * Default implementation allows us to not have to touch existing destinations while avoiding a lot + * of conditional statements in {@link IntegrationRunner}. + * + * @param config config + * @param catalog catalog + * @param outputRecordCollector outputRecordCollector + * @return AirbyteMessageConsumer wrapped in SerializedAirbyteMessageConsumer to maintain legacy + * behavior. + * @throws Exception exception + */ + default SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) + throws Exception { + return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector)); + } + static void defaultOutputRecordCollector(final AirbyteMessage message) { System.out.println(Jsons.serialize(message)); } + /** + * Backwards-compatibility wrapper for an AirbyteMessageConsumer. Strips the sizeInBytes argument + * away from the .accept call. + */ + @Slf4j + class ShimToSerializedAirbyteMessageConsumer implements SerializedAirbyteMessageConsumer { + + private final AirbyteMessageConsumer consumer; + + public ShimToSerializedAirbyteMessageConsumer(final AirbyteMessageConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void start() throws Exception { + consumer.start(); + } + + /** + * Consumes an {@link AirbyteMessage} for processing. + *

+ * If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE} + * message, processing is halted. Otherwise, the invalid message is logged and execution continues. + * + * @param inputString JSON representation of an {@link AirbyteMessage}. + * @throws Exception if an invalid state message is provided or the consumer is unable to accept the + * provided message. + */ + @Override + public void accept(final String inputString, final Integer sizeInBytes) throws Exception { + consumeMessage(consumer, inputString); + } + + @Override + public void close() throws Exception { + consumer.close(); + } + + /** + * Consumes an {@link AirbyteMessage} for processing. + *

+ * If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE} + * message, processing is halted. Otherwise, the invalid message is logged and execution continues. + * + * @param consumer An {@link AirbyteMessageConsumer} that can handle the provided message. + * @param inputString JSON representation of an {@link AirbyteMessage}. + * @throws Exception if an invalid state message is provided or the consumer is unable to accept the + * provided message. + */ + @VisibleForTesting + static void consumeMessage(final AirbyteMessageConsumer consumer, final String inputString) throws Exception { + + final Optional messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class); + if (messageOptional.isPresent()) { + consumer.accept(messageOptional.get()); + } else { + if (isStateMessage(inputString)) { + throw new IllegalStateException("Invalid state message: " + inputString); + } else { + log.error("Received invalid message: " + inputString); + } + } + } + + /** + * Tests whether the provided JSON string represents a state message. + * + * @param input a JSON string that represents an {@link AirbyteMessage}. + * @return {@code true} if the message is a state message, {@code false} otherwise. + */ + @SuppressWarnings("OptionalIsPresent") + private static boolean isStateMessage(final String input) { + final Optional deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class); + if (deserialized.isPresent()) { + return deserialized.get().getType() == Type.STATE; + } else { + return false; + } + } + + /** + * Custom class for parsing a JSON message to determine the type of the represented + * {@link AirbyteMessage}. Do the bare minimum deserialisation by reading only the type field. + */ + private static class AirbyteTypeMessage { + + @JsonProperty("type") + @JsonPropertyDescription("Message type") + private AirbyteMessage.Type type; + + @JsonProperty("type") + public AirbyteMessage.Type getType() { + return type; + } + + @JsonProperty("type") + public void setType(final AirbyteMessage.Type type) { + this.type = type; + } + + } + + } + } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 3f58d694a0fc..3fba41ef0f54 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.base; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -22,17 +20,17 @@ import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.validation.json.JsonSchemaValidator; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.List; import java.util.Optional; -import java.util.Scanner; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.apache.commons.lang3.ThreadUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; @@ -76,7 +74,7 @@ public IntegrationRunner(final Source source) { this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; // integration iface covers the commands that are the same for both source and destination. - this.integration = source != null ? source : destination; + integration = source != null ? source : destination; this.source = source; this.destination = destination; validator = new JsonSchemaValidator(); @@ -147,9 +145,20 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { final JsonNode config = parseConfig(parsed.getConfigPath()); validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { - runConsumer(consumer); - } + + final Procedure consumeWriteStreamCallable = () -> { + try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { + consumeWriteStream(consumer); + } + }; + + watchForOrphanThreads( + consumeWriteStreamCallable, + () -> System.exit(FORCED_EXIT_CODE), + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); } default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } @@ -197,31 +206,51 @@ private void produceMessages(final AutoCloseableIterator message } @VisibleForTesting - static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exception { - // use a Scanner that only processes new line characters to strictly abide with the - // https://jsonlines.org/ standard - final Scanner input = new Scanner(System.in, StandardCharsets.UTF_8).useDelimiter("[\r\n]+"); - consumer.start(); - while (input.hasNext()) { - consumeMessage(consumer, input.next()); + static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer) throws Exception { + try (final BufferedInputStream bis = new BufferedInputStream(System.in); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + consumeWriteStream(consumer, bis, baos); } } - private static void runConsumer(final AirbyteMessageConsumer consumer) throws Exception { - watchForOrphanThreads( - () -> consumeWriteStream(consumer), - () -> System.exit(FORCED_EXIT_CODE), - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + @VisibleForTesting + static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer, + final BufferedInputStream bis, + final ByteArrayOutputStream baos) + throws Exception { + consumer.start(); + + final byte[] buffer = new byte[8192]; // 8K buffer + int bytesRead; + boolean lastWasNewLine = false; + + while ((bytesRead = bis.read(buffer)) != -1) { + for (int i = 0; i < bytesRead; i++) { + final byte b = buffer[i]; + if (b == '\n' || b == '\r') { + if (!lastWasNewLine && baos.size() > 0) { + consumer.accept(baos.toString(StandardCharsets.UTF_8), baos.size()); + baos.reset(); + } + lastWasNewLine = true; + } else { + baos.write(b); + lastWasNewLine = false; + } + } + } + + // Handle last line if there's one + if (baos.size() > 0) { + consumer.accept(baos.toString(StandardCharsets.UTF_8), baos.size()); + } } /** * This method calls a runMethod and make sure that it won't produce orphan non-daemon active * threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread * is done, whereas daemon ones don't. - * + *

* If any active non-daemon threads would be left as orphans, this method will schedule some * interrupt/exit hooks after giving it some time delay to close up properly. It is generally * preferred to have a proper closing sequence from children threads instead of interrupting or @@ -244,7 +273,7 @@ static void watchForOrphanThreads(final Procedure runMethod, .stream() // daemon threads don't block the JVM if the main `currentThread` exits, so they are not problematic .filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) && !runningThread.isDaemon()) - .collect(Collectors.toList()); + .toList(); if (!runningThreads.isEmpty()) { LOGGER.warn(""" The main thread is exiting while children non-daemon threads from a connector are still active. @@ -275,32 +304,6 @@ static void watchForOrphanThreads(final Procedure runMethod, } } - /** - * Consumes an {@link AirbyteMessage} for processing. - * - * If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE} - * message, processing is halted. Otherwise, the invalid message is logged and execution continues. - * - * @param consumer An {@link AirbyteMessageConsumer} that can handle the provided message. - * @param inputString JSON representation of an {@link AirbyteMessage}. - * @throws Exception if an invalid state message is provided or the consumer is unable to accept the - * provided message. - */ - @VisibleForTesting - static void consumeMessage(final AirbyteMessageConsumer consumer, final String inputString) throws Exception { - - final Optional messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class); - if (messageOptional.isPresent()) { - consumer.accept(messageOptional.get()); - } else { - if (isStateMessage(inputString)) { - throw new IllegalStateException("Invalid state message: " + inputString); - } else { - LOGGER.error("Received invalid message: " + inputString); - } - } - } - private static String dumpThread(final Thread thread) { return String.format("%s (%s)\n Thread stacktrace: %s", thread.getName(), thread.getState(), Strings.join(List.of(thread.getStackTrace()), "\n at ")); @@ -336,41 +339,4 @@ static String parseConnectorVersion(final String connectorImage) { return tokens[tokens.length - 1]; } - /** - * Tests whether the provided JSON string represents a state message. - * - * @param input a JSON string that represents an {@link AirbyteMessage}. - * @return {@code true} if the message is a state message, {@code false} otherwise. - */ - private static boolean isStateMessage(final String input) { - final Optional deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class); - if (deserialized.isPresent()) { - return deserialized.get().getType() == Type.STATE; - } else { - return false; - } - } - - /** - * Custom class that can be used to parse a JSON message to determine the type of the represented - * {@link AirbyteMessage}. - */ - private static class AirbyteTypeMessage { - - @JsonProperty("type") - @JsonPropertyDescription("Message type") - private AirbyteMessage.Type type; - - @JsonProperty("type") - public AirbyteMessage.Type getType() { - return type; - } - - @JsonProperty("type") - public void setType(final AirbyteMessage.Type type) { - this.type = type; - } - - } - } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SerializedAirbyteMessageConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SerializedAirbyteMessageConsumer.java new file mode 100644 index 000000000000..dba2770335f1 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SerializedAirbyteMessageConsumer.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base; + +import io.airbyte.commons.functional.CheckedBiConsumer; +import io.airbyte.protocol.models.v0.AirbyteMessage; + +/** + * Interface for the destination's consumption of incoming messages as strings. This interface is + * backwards compatible with {@link AirbyteMessageConsumer}. + *

+ * This is via the accept method, which commonly handles parsing, validation, batching and writing + * of the transformed data to the final destination i.e. the technical system data is being written + * to. + *

+ * Lifecycle: + *

+ */ +public interface SerializedAirbyteMessageConsumer extends CheckedBiConsumer, AutoCloseable { + + /** + * Initialize anything needed for the consumer. Must be called before accept. + * + * @throws Exception exception + */ + void start() throws Exception; + + /** + * Consumes all {@link AirbyteMessage}s + * + * @param message {@link AirbyteMessage} as a string + * @param sizeInBytes size of that string in bytes + * @throws Exception exception + */ + @Override + void accept(String message, Integer sizeInBytes) throws Exception; + + /** + * Executes at the end of consumption of all incoming streamed data regardless of success or failure + * + * @throws Exception exception + */ + @Override + void close() throws Exception; + +} diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerBackwardsCompatbilityTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerBackwardsCompatbilityTest.java new file mode 100644 index 000000000000..7626a7df576a --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerBackwardsCompatbilityTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Scanner; +import org.junit.jupiter.api.Test; + +public class IntegrationRunnerBackwardsCompatbilityTest { + + @Test + void testByteArrayInputStreamVersusScanner() throws Exception { + final String[] testInputs = new String[] { + "This is line 1\nThis is line 2\nThis is line 3", + "This is line 1\n\nThis is line 2\n\n\nThis is line 3", + "This is line 1\rThis is line 2\nThis is line 3\r\nThis is line 4", + "This is line 1 with emoji 😊\nThis is line 2 with Greek characters: Α, Β, Χ\nThis is line 3 with Cyrillic characters: Д, Ж, З", + "This is a very long line that contains a lot of characters...", + "This is line 1 with an escaped newline \\n character\nThis is line 2 with another escaped newline \\n character", + "This is line 1\n\n", + "\nThis is line 2", + "\n" + }; + + for (final String testInput : testInputs) { + // get new output + final InputStream stream1 = new ByteArrayInputStream(testInput.getBytes(StandardCharsets.UTF_8)); + final MockConsumer consumer2 = new MockConsumer(); + try (final BufferedInputStream bis = new BufferedInputStream(stream1); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + IntegrationRunner.consumeWriteStream(consumer2, bis, baos); + } + final List newOutput = consumer2.getOutput(); + + // get old output + final List oldOutput = new ArrayList<>(); + final InputStream stream2 = new ByteArrayInputStream(testInput.getBytes(StandardCharsets.UTF_8)); + final Scanner scanner = new Scanner(stream2, StandardCharsets.UTF_8).useDelimiter("[\r\n]+"); + while (scanner.hasNext()) { + oldOutput.add(scanner.next()); + } + + assertEquals(oldOutput, newOutput); + } + } + + private static class MockConsumer implements SerializedAirbyteMessageConsumer { + + private final List output = new ArrayList<>(); + + @Override + public void start() { + + } + + @Override + public void accept(final String message, final Integer sizeInBytes) { + output.add(message); + } + + @Override + public void close() { + + } + + public List getOutput() { + return new ArrayList<>(output); + } + + } + +} diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 866bf8e07aa1..fbfbae6d13f6 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -27,6 +27,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.commons.util.MoreIterators; +import io.airbyte.integrations.base.Destination.ShimToSerializedAirbyteMessageConsumer; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -90,6 +91,7 @@ class IntegrationRunnerTest { private Path configPath; private Path configuredCatalogPath; private Path statePath; + private Path configDir; @SuppressWarnings("unchecked") @BeforeEach @@ -98,7 +100,7 @@ void setup() throws IOException { stdoutConsumer = Mockito.mock(Consumer.class); destination = mock(Destination.class); source = mock(Source.class); - final Path configDir = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test"); + configDir = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test"); configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING); configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG)); @@ -220,6 +222,7 @@ void testRead() throws Exception { final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); + // noinspection resource verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE); verify(stdoutConsumer).accept(message1); verify(stdoutConsumer).accept(message2); @@ -243,6 +246,7 @@ void testReadException() throws Exception { final Throwable throwable = catchThrowable(() -> new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS)); assertThat(throwable).isInstanceOf(ConfigErrorException.class); + // noinspection resource verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE); } @@ -291,9 +295,9 @@ void testCheckRuntimeException() throws Exception { @Test void testWrite() throws Exception { final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath); - final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class); + final SerializedAirbyteMessageConsumer consumerMock = mock(SerializedAirbyteMessageConsumer.class); when(cliParser.parse(ARGS)).thenReturn(intConfig); - when(destination.getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer)).thenReturn(airbyteMessageConsumerMock); + when(destination.getSerializedMessageConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer)).thenReturn(consumerMock); final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); when(destination.spec()).thenReturn(expectedConnSpec); @@ -304,7 +308,7 @@ void testWrite() throws Exception { final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator)); runner.run(ARGS); - verify(destination).getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer); + verify(destination).getSerializedMessageConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer); verify(jsonSchemaValidator).validate(any(), any()); } @@ -330,12 +334,13 @@ void testDestinationConsumerLifecycleSuccess() throws Exception { + Jsons.serialize(message2) + "\n" + Jsons.serialize(stateMessage)).getBytes(StandardCharsets.UTF_8))); - try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) { + try (final SerializedAirbyteMessageConsumer airbyteMessageConsumerMock = mock(SerializedAirbyteMessageConsumer.class)) { IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock); final InOrder inOrder = inOrder(airbyteMessageConsumerMock); - inOrder.verify(airbyteMessageConsumerMock).accept(message1); - inOrder.verify(airbyteMessageConsumerMock).accept(message2); - inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage); + inOrder.verify(airbyteMessageConsumerMock).accept(Jsons.serialize(message1), Jsons.serialize(message1).getBytes(StandardCharsets.UTF_8).length); + inOrder.verify(airbyteMessageConsumerMock).accept(Jsons.serialize(message2), Jsons.serialize(message2).getBytes(StandardCharsets.UTF_8).length); + inOrder.verify(airbyteMessageConsumerMock).accept(Jsons.serialize(stateMessage), + Jsons.serialize(stateMessage).getBytes(StandardCharsets.UTF_8).length); } } @@ -355,11 +360,12 @@ void testDestinationConsumerLifecycleFailure() throws Exception { .withEmittedAt(EMITTED_AT)); System.setIn(new ByteArrayInputStream((Jsons.serialize(message1) + "\n" + Jsons.serialize(message2)).getBytes(StandardCharsets.UTF_8))); - try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) { - doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1); + try (final SerializedAirbyteMessageConsumer airbyteMessageConsumerMock = mock(SerializedAirbyteMessageConsumer.class)) { + doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(Jsons.serialize(message1), + Jsons.serialize(message1).getBytes(StandardCharsets.UTF_8).length); assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock)); final InOrder inOrder = inOrder(airbyteMessageConsumerMock); - inOrder.verify(airbyteMessageConsumerMock).accept(message1); + inOrder.verify(airbyteMessageConsumerMock).accept(Jsons.serialize(message1), Jsons.serialize(message1).getBytes(StandardCharsets.UTF_8).length); inOrder.verifyNoMoreInteractions(); } } @@ -409,7 +415,7 @@ void testNoInterruptOrphanThreadFailure() { } final List runningThreads = ThreadUtils.getAllThreads().stream() .filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName)) - .collect(Collectors.toList()); + .toList(); // a thread that refuses to be interrupted should remain assertEquals(1, runningThreads.size()); assertEquals(1, caughtExceptions.size()); @@ -462,7 +468,7 @@ void testConsumptionOfInvalidStateMessage() { Assertions.assertThrows(IllegalStateException.class, () -> { try (final AirbyteMessageConsumer consumer = mock(AirbyteMessageConsumer.class)) { - IntegrationRunner.consumeMessage(consumer, invalidStateMessage); + ShimToSerializedAirbyteMessageConsumer.consumeMessage(consumer, invalidStateMessage); } }); } @@ -482,7 +488,7 @@ void testConsumptionOfInvalidNonStateMessage() { Assertions.assertDoesNotThrow(() -> { try (final AirbyteMessageConsumer consumer = mock(AirbyteMessageConsumer.class)) { - IntegrationRunner.consumeMessage(consumer, invalidNonStateMessage); + ShimToSerializedAirbyteMessageConsumer.consumeMessage(consumer, invalidNonStateMessage); verify(consumer, times(0)).accept(any(AirbyteMessage.class)); } }); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index d987f5f28709..7b3b78023d78 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -4,13 +4,21 @@ package io.airbyte.integrations.destination.snowflake; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class SnowflakeDestination extends SwitchingDestination { public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); + private final String airbyteEnvironment; enum DestinationType { COPY_S3, @@ -21,6 +29,27 @@ enum DestinationType { public SnowflakeDestination(final String airbyteEnvironment) { super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig, SnowflakeDestinationResolver.getTypeToDestination(airbyteEnvironment)); + this.airbyteEnvironment = airbyteEnvironment; + } + + @Override + public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) + throws Exception { + log.info("destination class: {}", getClass()); + // detect if running on internal staging for snowflake, if so run consumer2. + final boolean useAsyncSnowflake = false; + // final boolean useAsyncSnowflake = config.has("loading_method") + // && config.get("loading_method").has("method") + // && config.get("loading_method").get("method").asText().equals("Internal Staging"); + + if (useAsyncSnowflake) { + return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector); + } else { + return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector)); + } + } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index cafb4c463b16..13654b3fefc6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -12,6 +12,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.record_buffer.FileBuffer; @@ -31,7 +32,7 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class); - private String airbyteEnvironment; + private final String airbyteEnvironment; public SnowflakeInternalStagingDestination(final String airbyteEnvironment) { this(new SnowflakeSQLNameTransformer(), airbyteEnvironment); @@ -124,4 +125,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, true); } + @Override + public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { + // coming soon! the async snowflake destination. + return null; + } + }