Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grab bag of non-controversial clean up tasks #26702

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
grab bag of non-contraversial clean up tasks
  • Loading branch information
cgardens committed May 31, 2023
commit f80c459c2b4da7f03373cd7e486ab9eb2c1944f1
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvSerializedBuffer extends BaseSerializedBuffer {

private static final Logger LOGGER = LoggerFactory.getLogger(CsvSerializedBuffer.class);

public static final String CSV_GZ_SUFFIX = ".csv.gz";

private final CsvSheetGenerator csvSheetGenerator;
Expand All @@ -36,8 +40,8 @@ public CsvSerializedBuffer(final BufferStorage bufferStorage,
throws Exception {
super(bufferStorage);
this.csvSheetGenerator = csvSheetGenerator;
this.csvPrinter = null;
this.csvFormat = CSVFormat.DEFAULT;
csvPrinter = null;
csvFormat = CSVFormat.DEFAULT;
// we always want to compress csv files
withCompression(compression);
}
Expand All @@ -62,12 +66,20 @@ protected void writeRecord(final AirbyteRecordMessage record) throws IOException

@Override
protected void flushWriter() throws IOException {
csvPrinter.flush();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a single thread it wasn't really possible (or likely) for this to get called without accept getting called first. in a multi-threaded world that can actually happen pretty frequently, so when it log and move on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

maybe leave as a comment?

if (csvPrinter != null) {
csvPrinter.flush();
} else {
LOGGER.warn("Trying to flush but no printer is initialized.");
}
}

@Override
protected void closeWriter() throws IOException {
csvPrinter.close();
if (csvPrinter != null) {
csvPrinter.close();
} else {
LOGGER.warn("Trying to close but no printer is initialized.");
}
}

public static BufferCreateFunction createFunction(final S3CsvFormatConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class RecordSizeEstimator {
* determined by {@code sampleBatchSize}.
*/
public RecordSizeEstimator(final int sampleBatchSize) {
this.streamRecordSizeEstimation = new HashMap<>();
this.streamSampleCountdown = new HashMap<>();
streamRecordSizeEstimation = new HashMap<>();
streamSampleCountdown = new HashMap<>();
this.sampleBatchSize = sampleBatchSize;
}

Expand Down Expand Up @@ -71,7 +71,7 @@ public long getEstimatedByteSize(final AirbyteRecordMessage record) {
}

@VisibleForTesting
static long getStringByteSize(final JsonNode data) {
public static long getStringByteSize(final JsonNode data) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we end up using this in some tests later on

// assume UTF-8 encoding, and each char is 4 bytes long
return Jsons.serialize(data).length() * 4L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void flush() throws IOException {
if (inputStream == null && !isClosed) {
flushWriter();
if (compressedBuffer != null) {
LOGGER.info("Wrapping up compression and write GZIP trailer data.");
LOGGER.debug("Wrapping up compression and write GZIP trailer data.");
compressedBuffer.flush();
compressedBuffer.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination_async;

import java.text.DecimalFormat;

/**
* Replicate the behavior of {@link org.apache.commons.io.FileUtils} to match the proclivities of
* Davin and Charles. Courteously written by ChatGPT.
*/
public class AirbyteFileUtils {

private static final double ONE_KB = 1024;
private static final double ONE_MB = ONE_KB * 1024;
private static final double ONE_GB = ONE_MB * 1024;
private static final double ONE_TB = ONE_GB * 1024;
private static final DecimalFormat df = new DecimalFormat("#.##");

/**
* Replicate the behavior of {@link org.apache.commons.io.FileUtils} but instead of rounding down to
* the nearest whole number, it rounds to two decimal places.
*
* @param sizeInBytes size in bytes
* @return human-readable size
*/
public static String byteCountToDisplaySize(final long sizeInBytes) {

if (sizeInBytes < ONE_KB) {
return df.format(sizeInBytes) + " bytes";
} else if (sizeInBytes < ONE_MB) {
return df.format((double) sizeInBytes / ONE_KB) + " KB";
} else if (sizeInBytes < ONE_GB) {
return df.format((double) sizeInBytes / ONE_MB) + " MB";
} else if (sizeInBytes < ONE_TB) {
return df.format((double) sizeInBytes / ONE_GB) + " GB";
} else {
return df.format((double) sizeInBytes / ONE_TB) + " TB";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

/**
* Responsible for managing global memory across multiple queues in a thread-safe way.
* <p>
* This means memory allocation and deallocation for each queue can be dynamically adjusted
* This means memory allocation and de-allocation for each queue can be dynamically adjusted
* according to the overall available memory. Memory blocks are managed in chunks of
* {@link #BLOCK_SIZE_BYTES}, and the total amount of memory managed is configured at creation time.
* <p>
Expand Down Expand Up @@ -68,6 +69,10 @@ public synchronized long requestMemory() {
final var toAllocateBytes = Math.min(freeMem, BLOCK_SIZE_BYTES);
currentMemoryBytes.addAndGet(toAllocateBytes);

log.debug("Memory Requested: max: {}, allocated: {}, allocated in this request: {}",
FileUtils.byteCountToDisplaySize(maxMemoryBytes),
FileUtils.byteCountToDisplaySize(currentMemoryBytes.get()),
FileUtils.byteCountToDisplaySize(toAllocateBytes));
return toAllocateBytes;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination_async;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

public class AirbyteFileUtilsTest {

@Test
void testByteCountToDisplaySize() {

assertEquals("500 bytes", AirbyteFileUtils.byteCountToDisplaySize(500L));
assertEquals("1.95 KB", AirbyteFileUtils.byteCountToDisplaySize(2000L));
assertEquals("2.93 MB", AirbyteFileUtils.byteCountToDisplaySize(3072000L));
assertEquals("2.67 GB", AirbyteFileUtils.byteCountToDisplaySize(2872000000L));
assertEquals("1.82 TB", AirbyteFileUtils.byteCountToDisplaySize(2000000000000L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"streams": [
{
"stream": {
"name": "users",
"name": "users_10m",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"streams": [
{
"stream": {
"name": "users",
"name": "users_1m",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
Expand Down