Skip to content

Commit

Permalink
grab bag of non-controversial clean up tasks (#26702)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored May 31, 2023
1 parent 567f839 commit 480a43b
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvSerializedBuffer extends BaseSerializedBuffer {

private static final Logger LOGGER = LoggerFactory.getLogger(CsvSerializedBuffer.class);

public static final String CSV_GZ_SUFFIX = ".csv.gz";

private final CsvSheetGenerator csvSheetGenerator;
Expand All @@ -36,8 +40,8 @@ public CsvSerializedBuffer(final BufferStorage bufferStorage,
throws Exception {
super(bufferStorage);
this.csvSheetGenerator = csvSheetGenerator;
this.csvPrinter = null;
this.csvFormat = CSVFormat.DEFAULT;
csvPrinter = null;
csvFormat = CSVFormat.DEFAULT;
// we always want to compress csv files
withCompression(compression);
}
Expand All @@ -62,12 +66,22 @@ protected void writeRecord(final AirbyteRecordMessage record) throws IOException

@Override
protected void flushWriter() throws IOException {
csvPrinter.flush();
// in an async world, it is possible that flush writer gets called even if no records were accepted.
if (csvPrinter != null) {
csvPrinter.flush();
} else {
LOGGER.warn("Trying to flush but no printer is initialized.");
}
}

@Override
protected void closeWriter() throws IOException {
csvPrinter.close();
// in an async world, it is possible that flush writer gets called even if no records were accepted.
if (csvPrinter != null) {
csvPrinter.close();
} else {
LOGGER.warn("Trying to close but no printer is initialized.");
}
}

public static BufferCreateFunction createFunction(final S3CsvFormatConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class RecordSizeEstimator {
* determined by {@code sampleBatchSize}.
*/
public RecordSizeEstimator(final int sampleBatchSize) {
this.streamRecordSizeEstimation = new HashMap<>();
this.streamSampleCountdown = new HashMap<>();
streamRecordSizeEstimation = new HashMap<>();
streamSampleCountdown = new HashMap<>();
this.sampleBatchSize = sampleBatchSize;
}

Expand Down Expand Up @@ -71,7 +71,7 @@ public long getEstimatedByteSize(final AirbyteRecordMessage record) {
}

@VisibleForTesting
static long getStringByteSize(final JsonNode data) {
public static long getStringByteSize(final JsonNode data) {
// assume UTF-8 encoding, and each char is 4 bytes long
return Jsons.serialize(data).length() * 4L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void flush() throws IOException {
if (inputStream == null && !isClosed) {
flushWriter();
if (compressedBuffer != null) {
LOGGER.info("Wrapping up compression and write GZIP trailer data.");
LOGGER.debug("Wrapping up compression and write GZIP trailer data.");
compressedBuffer.flush();
compressedBuffer.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination_async;

import java.text.DecimalFormat;

/**
* Replicate the behavior of {@link org.apache.commons.io.FileUtils} to match the proclivities of
* Davin and Charles. Courteously written by ChatGPT.
*/
public class AirbyteFileUtils {

private static final double ONE_KB = 1024;
private static final double ONE_MB = ONE_KB * 1024;
private static final double ONE_GB = ONE_MB * 1024;
private static final double ONE_TB = ONE_GB * 1024;
private static final DecimalFormat df = new DecimalFormat("#.##");

/**
* Replicate the behavior of {@link org.apache.commons.io.FileUtils} but instead of rounding down to
* the nearest whole number, it rounds to two decimal places.
*
* @param sizeInBytes size in bytes
* @return human-readable size
*/
public static String byteCountToDisplaySize(final long sizeInBytes) {

if (sizeInBytes < ONE_KB) {
return df.format(sizeInBytes) + " bytes";
} else if (sizeInBytes < ONE_MB) {
return df.format((double) sizeInBytes / ONE_KB) + " KB";
} else if (sizeInBytes < ONE_GB) {
return df.format((double) sizeInBytes / ONE_MB) + " MB";
} else if (sizeInBytes < ONE_TB) {
return df.format((double) sizeInBytes / ONE_GB) + " GB";
} else {
return df.format((double) sizeInBytes / ONE_TB) + " TB";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package io.airbyte.integrations.destination_async;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction;
Expand Down Expand Up @@ -42,13 +40,10 @@ public class AsyncStreamConsumer implements AirbyteMessageConsumer {
private final OnStartFunction onStart;
private final OnCloseFunction onClose;
private final ConfiguredAirbyteCatalog catalog;
private final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord;

private final BufferManager bufferManager;
private final BufferEnqueue bufferEnqueue;
private final FlushWorkers flushWorkers;
private final Set<StreamDescriptor> streamNames;
private final IgnoredRecordsTracker ignoredRecordsTracker;

private boolean hasStarted;
private boolean hasClosed;
Expand All @@ -58,7 +53,6 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final BufferManager bufferManager) {
hasStarted = false;
hasClosed = false;
Expand All @@ -67,12 +61,10 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
this.onStart = onStart;
this.onClose = onClose;
this.catalog = catalog;
this.isValidRecord = isValidRecord;
this.bufferManager = bufferManager;
this.bufferEnqueue = bufferManager.getBufferEnqueue();
this.flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher);
this.streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
this.ignoredRecordsTracker = new IgnoredRecordsTracker();
bufferEnqueue = bufferManager.getBufferEnqueue();
flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher);
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
}

@Override
Expand Down Expand Up @@ -109,7 +101,6 @@ public void close() throws Exception {
// or we risk in-memory data.
flushWorkers.close();
bufferManager.close();
ignoredRecordsTracker.report();
onClose.call();
LOGGER.info("{} closed.", AsyncStreamConsumer.class);
}
Expand Down Expand Up @@ -148,21 +139,6 @@ private void validateRecord(final AirbyteMessage message, final StreamDescriptor
if (!streamNames.contains(streamDescriptor)) {
throwUnrecognizedStream(catalog, message);
}

trackerIsValidRecord(message, streamDescriptor);
}

private void trackerIsValidRecord(final AirbyteMessage message, final StreamDescriptor streamDescriptor) {
// todo (cgardens) - is valid should also move inside the tracker, but don't want to blow up more
// constructors right now.
try {

if (!isValidRecord.apply(message.getRecord().getData())) {
ignoredRecordsTracker.addRecord(streamDescriptor, message);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

/**
* Responsible for managing global memory across multiple queues in a thread-safe way.
* <p>
* This means memory allocation and deallocation for each queue can be dynamically adjusted
* This means memory allocation and de-allocation for each queue can be dynamically adjusted
* according to the overall available memory. Memory blocks are managed in chunks of
* {@link #BLOCK_SIZE_BYTES}, and the total amount of memory managed is configured at creation time.
* <p>
Expand Down Expand Up @@ -68,6 +69,10 @@ public synchronized long requestMemory() {
final var toAllocateBytes = Math.min(freeMem, BLOCK_SIZE_BYTES);
currentMemoryBytes.addAndGet(toAllocateBytes);

log.debug("Memory Requested: max: {}, allocated: {}, allocated in this request: {}",
FileUtils.byteCountToDisplaySize(maxMemoryBytes),
FileUtils.byteCountToDisplaySize(currentMemoryBytes.get()),
FileUtils.byteCountToDisplaySize(toAllocateBytes));
return toAllocateBytes;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination_async;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

public class AirbyteFileUtilsTest {

@Test
void testByteCountToDisplaySize() {

assertEquals("500 bytes", AirbyteFileUtils.byteCountToDisplaySize(500L));
assertEquals("1.95 KB", AirbyteFileUtils.byteCountToDisplaySize(2000L));
assertEquals("2.93 MB", AirbyteFileUtils.byteCountToDisplaySize(3072000L));
assertEquals("2.67 GB", AirbyteFileUtils.byteCountToDisplaySize(2872000000L));
assertEquals("1.82 TB", AirbyteFileUtils.byteCountToDisplaySize(2000000000000L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public AirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputR
() -> GeneralStagingFunctions.onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData).accept(false),
flusher,
catalog,
stagingOperations::isValidData,
new BufferManager());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"streams": [
{
"stream": {
"name": "users",
"name": "users_10m",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"streams": [
{
"stream": {
"name": "users",
"name": "users_1m",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
Expand Down

0 comments on commit 480a43b

Please sign in to comment.