From 480a43b9ab758e130832186116df78242b6086e2 Mon Sep 17 00:00:00 2001 From: Charles Date: Wed, 31 May 2023 08:53:27 -0700 Subject: [PATCH] grab bag of non-controversial clean up tasks (#26702) --- .../s3/csv/CsvSerializedBuffer.java | 22 ++++++++-- .../RecordSizeEstimator.java | 6 +-- .../record_buffer/BaseSerializedBuffer.java | 2 +- .../destination_async/AirbyteFileUtils.java | 43 +++++++++++++++++++ .../AsyncStreamConsumer.java | 30 ++----------- .../GlobalMemoryManager.java | 7 ++- .../IgnoredRecordsTracker.java | 39 ----------------- .../AirbyteFileUtilsTest.java | 23 ++++++++++ .../staging/StagingConsumerFactory.java | 1 - .../destination-snowflake/10m_catalog.json | 2 +- .../destination-snowflake/1m_catalog.json | 2 +- 11 files changed, 99 insertions(+), 78 deletions(-) create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AirbyteFileUtils.java delete mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/IgnoredRecordsTracker.java create mode 100644 airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination_async/AirbyteFileUtilsTest.java diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java index 4ac646dbae4f..9cb2b2a4f719 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java @@ -21,9 +21,13 @@ import org.apache.commons.csv.CSVPrinter; import org.apache.commons.csv.QuoteMode; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CsvSerializedBuffer extends BaseSerializedBuffer { + private static final Logger LOGGER = LoggerFactory.getLogger(CsvSerializedBuffer.class); + public static final String CSV_GZ_SUFFIX = ".csv.gz"; private final CsvSheetGenerator csvSheetGenerator; @@ -36,8 +40,8 @@ public CsvSerializedBuffer(final BufferStorage bufferStorage, throws Exception { super(bufferStorage); this.csvSheetGenerator = csvSheetGenerator; - this.csvPrinter = null; - this.csvFormat = CSVFormat.DEFAULT; + csvPrinter = null; + csvFormat = CSVFormat.DEFAULT; // we always want to compress csv files withCompression(compression); } @@ -62,12 +66,22 @@ protected void writeRecord(final AirbyteRecordMessage record) throws IOException @Override protected void flushWriter() throws IOException { - csvPrinter.flush(); + // in an async world, it is possible that flush writer gets called even if no records were accepted. + if (csvPrinter != null) { + csvPrinter.flush(); + } else { + LOGGER.warn("Trying to flush but no printer is initialized."); + } } @Override protected void closeWriter() throws IOException { - csvPrinter.close(); + // in an async world, it is possible that flush writer gets called even if no records were accepted. + if (csvPrinter != null) { + csvPrinter.close(); + } else { + LOGGER.warn("Trying to close but no printer is initialized."); + } } public static BufferCreateFunction createFunction(final S3CsvFormatConfig config, diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java index 9c5949a645f2..b27cb1860386 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordSizeEstimator.java @@ -34,8 +34,8 @@ public class RecordSizeEstimator { * determined by {@code sampleBatchSize}. */ public RecordSizeEstimator(final int sampleBatchSize) { - this.streamRecordSizeEstimation = new HashMap<>(); - this.streamSampleCountdown = new HashMap<>(); + streamRecordSizeEstimation = new HashMap<>(); + streamSampleCountdown = new HashMap<>(); this.sampleBatchSize = sampleBatchSize; } @@ -71,7 +71,7 @@ public long getEstimatedByteSize(final AirbyteRecordMessage record) { } @VisibleForTesting - static long getStringByteSize(final JsonNode data) { + public static long getStringByteSize(final JsonNode data) { // assume UTF-8 encoding, and each char is 4 bytes long return Jsons.serialize(data).length() * 4L; } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java index 9d6ce6acc976..73d16162aa3f 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BaseSerializedBuffer.java @@ -128,7 +128,7 @@ public void flush() throws IOException { if (inputStream == null && !isClosed) { flushWriter(); if (compressedBuffer != null) { - LOGGER.info("Wrapping up compression and write GZIP trailer data."); + LOGGER.debug("Wrapping up compression and write GZIP trailer data."); compressedBuffer.flush(); compressedBuffer.close(); } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AirbyteFileUtils.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AirbyteFileUtils.java new file mode 100644 index 000000000000..23c72f12c884 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AirbyteFileUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination_async; + +import java.text.DecimalFormat; + +/** + * Replicate the behavior of {@link org.apache.commons.io.FileUtils} to match the proclivities of + * Davin and Charles. Courteously written by ChatGPT. + */ +public class AirbyteFileUtils { + + private static final double ONE_KB = 1024; + private static final double ONE_MB = ONE_KB * 1024; + private static final double ONE_GB = ONE_MB * 1024; + private static final double ONE_TB = ONE_GB * 1024; + private static final DecimalFormat df = new DecimalFormat("#.##"); + + /** + * Replicate the behavior of {@link org.apache.commons.io.FileUtils} but instead of rounding down to + * the nearest whole number, it rounds to two decimal places. + * + * @param sizeInBytes size in bytes + * @return human-readable size + */ + public static String byteCountToDisplaySize(final long sizeInBytes) { + + if (sizeInBytes < ONE_KB) { + return df.format(sizeInBytes) + " bytes"; + } else if (sizeInBytes < ONE_MB) { + return df.format((double) sizeInBytes / ONE_KB) + " KB"; + } else if (sizeInBytes < ONE_GB) { + return df.format((double) sizeInBytes / ONE_MB) + " MB"; + } else if (sizeInBytes < ONE_TB) { + return df.format((double) sizeInBytes / ONE_GB) + " GB"; + } else { + return df.format((double) sizeInBytes / ONE_TB) + " TB"; + } + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AsyncStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AsyncStreamConsumer.java index fe6dd75fe05c..eb7ce28df7a5 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AsyncStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/AsyncStreamConsumer.java @@ -4,9 +4,7 @@ package io.airbyte.integrations.destination_async; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; -import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction; @@ -42,13 +40,10 @@ public class AsyncStreamConsumer implements AirbyteMessageConsumer { private final OnStartFunction onStart; private final OnCloseFunction onClose; private final ConfiguredAirbyteCatalog catalog; - private final CheckedFunction isValidRecord; - private final BufferManager bufferManager; private final BufferEnqueue bufferEnqueue; private final FlushWorkers flushWorkers; private final Set streamNames; - private final IgnoredRecordsTracker ignoredRecordsTracker; private boolean hasStarted; private boolean hasClosed; @@ -58,7 +53,6 @@ public AsyncStreamConsumer(final Consumer outputRecordCollector, final OnCloseFunction onClose, final DestinationFlushFunction flusher, final ConfiguredAirbyteCatalog catalog, - final CheckedFunction isValidRecord, final BufferManager bufferManager) { hasStarted = false; hasClosed = false; @@ -67,12 +61,10 @@ public AsyncStreamConsumer(final Consumer outputRecordCollector, this.onStart = onStart; this.onClose = onClose; this.catalog = catalog; - this.isValidRecord = isValidRecord; this.bufferManager = bufferManager; - this.bufferEnqueue = bufferManager.getBufferEnqueue(); - this.flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher); - this.streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog); - this.ignoredRecordsTracker = new IgnoredRecordsTracker(); + bufferEnqueue = bufferManager.getBufferEnqueue(); + flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher); + streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog); } @Override @@ -109,7 +101,6 @@ public void close() throws Exception { // or we risk in-memory data. flushWorkers.close(); bufferManager.close(); - ignoredRecordsTracker.report(); onClose.call(); LOGGER.info("{} closed.", AsyncStreamConsumer.class); } @@ -148,21 +139,6 @@ private void validateRecord(final AirbyteMessage message, final StreamDescriptor if (!streamNames.contains(streamDescriptor)) { throwUnrecognizedStream(catalog, message); } - - trackerIsValidRecord(message, streamDescriptor); - } - - private void trackerIsValidRecord(final AirbyteMessage message, final StreamDescriptor streamDescriptor) { - // todo (cgardens) - is valid should also move inside the tracker, but don't want to blow up more - // constructors right now. - try { - - if (!isValidRecord.apply(message.getRecord().getData())) { - ignoredRecordsTracker.addRecord(streamDescriptor, message); - } - } catch (final Exception e) { - throw new RuntimeException(e); - } } private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) { diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java index b9aa1a954200..726e1c7a4cc1 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java @@ -6,11 +6,12 @@ import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; /** * Responsible for managing global memory across multiple queues in a thread-safe way. *

- * This means memory allocation and deallocation for each queue can be dynamically adjusted + * This means memory allocation and de-allocation for each queue can be dynamically adjusted * according to the overall available memory. Memory blocks are managed in chunks of * {@link #BLOCK_SIZE_BYTES}, and the total amount of memory managed is configured at creation time. *

@@ -68,6 +69,10 @@ public synchronized long requestMemory() { final var toAllocateBytes = Math.min(freeMem, BLOCK_SIZE_BYTES); currentMemoryBytes.addAndGet(toAllocateBytes); + log.debug("Memory Requested: max: {}, allocated: {}, allocated in this request: {}", + FileUtils.byteCountToDisplaySize(maxMemoryBytes), + FileUtils.byteCountToDisplaySize(currentMemoryBytes.get()), + FileUtils.byteCountToDisplaySize(toAllocateBytes)); return toAllocateBytes; } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/IgnoredRecordsTracker.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/IgnoredRecordsTracker.java deleted file mode 100644 index 1d282559751e..000000000000 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/IgnoredRecordsTracker.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination_async; - -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.HashMap; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IgnoredRecordsTracker { - - private static final Logger LOGGER = LoggerFactory.getLogger(IgnoredRecordsTracker.class); - - private final Map streamToIgnoredRecordCount; - - public IgnoredRecordsTracker() { - this(new HashMap<>()); - } - - @VisibleForTesting - IgnoredRecordsTracker(final Map streamToIgnoredRecordCount) { - this.streamToIgnoredRecordCount = streamToIgnoredRecordCount; - } - - public void addRecord(final StreamDescriptor streamDescriptor, final AirbyteMessage recordMessage) { - streamToIgnoredRecordCount.put(streamDescriptor, streamToIgnoredRecordCount.getOrDefault(streamDescriptor, 0L) + 1L); - } - - public void report() { - streamToIgnoredRecordCount - .forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair)); - } - -} diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination_async/AirbyteFileUtilsTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination_async/AirbyteFileUtilsTest.java new file mode 100644 index 000000000000..286b4e5d8ffa --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination_async/AirbyteFileUtilsTest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination_async; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +public class AirbyteFileUtilsTest { + + @Test + void testByteCountToDisplaySize() { + + assertEquals("500 bytes", AirbyteFileUtils.byteCountToDisplaySize(500L)); + assertEquals("1.95 KB", AirbyteFileUtils.byteCountToDisplaySize(2000L)); + assertEquals("2.93 MB", AirbyteFileUtils.byteCountToDisplaySize(3072000L)); + assertEquals("2.67 GB", AirbyteFileUtils.byteCountToDisplaySize(2872000000L)); + assertEquals("1.82 TB", AirbyteFileUtils.byteCountToDisplaySize(2000000000000L)); + } + +} diff --git a/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index c564be81475d..0cb428c763d6 100644 --- a/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -96,7 +96,6 @@ public AirbyteMessageConsumer createAsync(final Consumer outputR () -> GeneralStagingFunctions.onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData).accept(false), flusher, catalog, - stagingOperations::isValidData, new BufferManager()); } diff --git a/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/10m_catalog.json b/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/10m_catalog.json index 85da81dd80f2..975ac744a30d 100644 --- a/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/10m_catalog.json +++ b/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/10m_catalog.json @@ -2,7 +2,7 @@ "streams": [ { "stream": { - "name": "users", + "name": "users_10m", "namespace": "PERF_TEST_HARNESS", "json_schema": { "type": "object", diff --git a/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/1m_catalog.json b/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/1m_catalog.json index 85da81dd80f2..9937af8983d7 100644 --- a/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/1m_catalog.json +++ b/airbyte-integrations/connectors-performance/destination-harness/src/main/resources/catalogs/destination-snowflake/1m_catalog.json @@ -2,7 +2,7 @@ "streams": [ { "stream": { - "name": "users", + "name": "users_1m", "namespace": "PERF_TEST_HARNESS", "json_schema": { "type": "object",