Skip to content

Commit

Permalink
Incorporated StreamCopier and StreamCopierFactory interface changes (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
dandorazio committed Sep 17, 2021
1 parent 0f4c8f5 commit 7c15d8f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
import com.azure.storage.blob.specialized.BlobOutputStream;
import com.azure.storage.common.StorageSharedKeyCredential;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.UUID;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
Expand Down Expand Up @@ -95,8 +99,10 @@ public AzureBlobStreamCopier(String stagingFolder,
}

@Override
public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception {
csvPrinter.printRecord(id, jsonDataString, emittedAt);
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
csvPrinter.printRecord(id,
Jsons.serialize(recordMessage.getData()),
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

public abstract class AzureBlobStreamCopierFactory implements StreamCopierFactory<AzureBlobConfig> {
Expand All @@ -43,16 +44,16 @@ public abstract class AzureBlobStreamCopierFactory implements StreamCopierFactor
public StreamCopier create(String configuredSchema,
AzureBlobConfig azureBlobConfig,
String stagingFolder,
DestinationSyncMode syncMode,
AirbyteStream stream,
ConfiguredAirbyteStream configuredStream,
ExtendedNameTransformer nameTransformer,
JdbcDatabase db,
SqlOperations sqlOperations) {
try {
var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
var streamName = pair.getName();
var schema = getSchema(stream, configuredSchema, nameTransformer);
var appendBlobClient = AzureBlobStreamCopier.getAppendBlobClient(azureBlobConfig, streamName);
AirbyteStream stream = configuredStream.getStream();
DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
String streamName = stream.getName();
AppendBlobClient appendBlobClient = AzureBlobStreamCopier.getAppendBlobClient(azureBlobConfig, streamName);

return create(stagingFolder, syncMode, schema, streamName, appendBlobClient, db, azureBlobConfig, nameTransformer, sqlOperations);
} catch (Exception e) {
Expand All @@ -74,12 +75,4 @@ public abstract StreamCopier create(String stagingFolder,
SqlOperations sqlOperations)
throws Exception;

private String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) {
if (stream.getNamespace() != null) {
return nameTransformer.convertStreamName(stream.getNamespace());
} else {
return nameTransformer.convertStreamName(configuredSchema);
}
}

}

0 comments on commit 7c15d8f

Please sign in to comment.