Skip to content

Commit

Permalink
remove isvalidrecord tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 27, 2023
1 parent 1e05a13 commit 8016643
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package io.airbyte.integrations.destination_async;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction;
Expand Down Expand Up @@ -42,13 +40,10 @@ public class AsyncStreamConsumer implements AirbyteMessageConsumer {
private final OnStartFunction onStart;
private final OnCloseFunction onClose;
private final ConfiguredAirbyteCatalog catalog;
private final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord;

private final BufferManager bufferManager;
private final BufferEnqueue bufferEnqueue;
private final FlushWorkers flushWorkers;
private final Set<StreamDescriptor> streamNames;
private final IgnoredRecordsTracker ignoredRecordsTracker;

private boolean hasStarted;
private boolean hasClosed;
Expand All @@ -58,7 +53,6 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final BufferManager bufferManager) {
hasStarted = false;
hasClosed = false;
Expand All @@ -67,12 +61,10 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
this.onStart = onStart;
this.onClose = onClose;
this.catalog = catalog;
this.isValidRecord = isValidRecord;
this.bufferManager = bufferManager;
this.bufferEnqueue = bufferManager.getBufferEnqueue();
this.flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher);
this.streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
this.ignoredRecordsTracker = new IgnoredRecordsTracker();
bufferEnqueue = bufferManager.getBufferEnqueue();
flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher);
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
}

@Override
Expand Down Expand Up @@ -109,7 +101,6 @@ public void close() throws Exception {
// or we risk in-memory data.
flushWorkers.close();
bufferManager.close();
ignoredRecordsTracker.report();
onClose.call();
LOGGER.info("{} closed.", AsyncStreamConsumer.class);
}
Expand Down Expand Up @@ -148,21 +139,6 @@ private void validateRecord(final AirbyteMessage message, final StreamDescriptor
if (!streamNames.contains(streamDescriptor)) {
throwUnrecognizedStream(catalog, message);
}

trackerIsValidRecord(message, streamDescriptor);
}

private void trackerIsValidRecord(final AirbyteMessage message, final StreamDescriptor streamDescriptor) {
// todo (cgardens) - is valid should also move inside the tracker, but don't want to blow up more
// constructors right now.
try {

if (!isValidRecord.apply(message.getRecord().getData())) {
ignoredRecordsTracker.addRecord(streamDescriptor, message);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public AirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputR
() -> GeneralStagingFunctions.onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData).accept(false),
flusher,
catalog,
stagingOperations::isValidData,
new BufferManager());
}

Expand Down

0 comments on commit 8016643

Please sign in to comment.