Skip to content

Commit

Permalink
Revert placeholder PR (airbytehq#5545)
Browse files Browse the repository at this point in the history
This reverts commit 05a5503.
  • Loading branch information
tuliren authored Aug 20, 2021
1 parent 05a5503 commit 968ef37
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 104 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,3 @@ resources/examples/airflow/logs/*

# Cloud Demo
!airbyte-webapp/src/packages/cloud/data

6 changes: 0 additions & 6 deletions airbyte-integrations/connectors/destination-gcs/readme.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
* SOFTWARE.
*/

package io.airbyte.integrations.destination.s3.avro;
package io.airbyte.integrations.destination.gcs.avro;

import alex.mojaki.s3upload.MultiPartOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,33 @@

public class GcsConfig {

private final String projectId;
private final String bucketName;
private final String bucketPath;
private final String accessKeyId;
private final String secretAccessKey;
private final String region;
private final String credentialsJson;

public GcsConfig(String bucketName, String bucketPath, String accessKeyId, String secretAccessKey, String region) {
public GcsConfig(String projectId, String bucketName, String credentialsJson) {
this.projectId = projectId;
this.bucketName = bucketName;
this.bucketPath = bucketPath;
this.accessKeyId = accessKeyId;
this.secretAccessKey = secretAccessKey;
this.region = region;
this.credentialsJson = credentialsJson;
}

public String getBucketName() {
return bucketName;
public String getProjectId() {
return projectId;
}

public String getbucketPath() {
return bucketPath;
}

public String getAccessKeyId() {
return accessKeyId;
}

public String getSecretAccessKey() {
return secretAccessKey;
public String getBucketName() {
return bucketName;
}

public String getRegion() {
return region;
public String getCredentialsJson() {
return credentialsJson;
}

public static GcsConfig getGcsConfig(JsonNode config) {

public static GcsConfig getGcsConfig(JsonNode config) {
return new GcsConfig(
config.get("gcs_bucket_name").asText(),
config.get("gcs_bucket_path").asText(),
config.get("access_key_id").asText(),
config.get("secret_access_key").asText(),
config.get("gcs_bucket_region").asText()
);
config.get("loading_method").get("project_id").asText(),
config.get("loading_method").get("bucket_name").asText(),
config.get("loading_method").get("credentials_json").asText());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder;


public abstract class GcsStreamCopier implements StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class);
Expand Down Expand Up @@ -188,37 +180,24 @@ public static void attemptWriteToPersistence(GcsConfig gcsConfig) throws IOExcep
attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName);
}

public static void attemptGcsWriteAndDelete(GcsConfig gcsConfig) throws IOException {
final String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName);
}

private static void attemptWriteAndDeleteGcsObject(GcsConfig gcsConfig, String outputTableName) throws IOException {
var storage = getStorageClient(gcsConfig);
// var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
// var blobInfo = BlobInfo.newBuilder(blobId).build();

// storage.create(blobInfo, "".getBytes());
// storage.delete(blobId);
var gcsBucket = gcsConfig.getBucketName();
var storage = getStorageClient(gcsConfig);
var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
var blobInfo = BlobInfo.newBuilder(blobId).build();

storage.putObject(gcsBucket, outputTableName, "check-content");
storage.deleteObject(gcsBucket, outputTableName);
storage.create(blobInfo, "".getBytes());
storage.delete(blobId);
}

public static AmazonS3 getStorageClient(GcsConfig gcsConfig) throws IOException {
var region = gcsConfig.getRegion();
var accessKeyId = gcsConfig.getAccessKeyId();
var secretAccessKey = gcsConfig.getSecretAccessKey();

var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"https://storage.googleapis.com", region))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}
public static Storage getStorageClient(GcsConfig gcsConfig) throws IOException {
InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes());
GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
return StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(gcsConfig.getProjectId())
.build()
.getService();
}

public abstract void copyGcsCsvFileIntoTable(JdbcDatabase database,
String gcsFileLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder;

public abstract class GcsStreamCopierFactory implements StreamCopierFactory<GcsConfig> {

/**
Expand All @@ -63,28 +56,15 @@ public StreamCopier create(String configuredSchema,
var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
var schema = getSchema(stream, configuredSchema, nameTransformer);

var region = gcsConfig.getRegion();
var accessKeyId = gcsConfig.getAccessKeyId();
var secretAccessKey = gcsConfig.getSecretAccessKey();

// InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes());
// GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
// Storage storageClient = StorageOptions.newBuilder()
// .setCredentials(credentials)
// .setProjectId(gcsConfig.getProjectId())
// .build()
// .getService();

var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"https://storage.googleapis.com", region))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes());
GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
Storage storageClient = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(gcsConfig.getProjectId())
.build()
.getService();

return create(stagingFolder, syncMode, schema, pair.getName(), //storageClient
s3Client, db, gcsConfig, nameTransformer, sqlOperations);
return create(stagingFolder, syncMode, schema, pair.getName(), storageClient, db, gcsConfig, nameTransformer, sqlOperations);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -97,7 +77,7 @@ public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
Storage storageClient,
JdbcDatabase db,
GcsConfig gcsConfig,
ExtendedNameTransformer nameTransformer,
Expand Down
1 change: 0 additions & 1 deletion gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ case "`uname`" in
esac

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
JAVA_HOME=/Library/Java/JavaVirtualMachines/adoptopenjdk-14.jdk/Contents/Home


# Determine the Java command to use to start the JVM.
Expand Down

0 comments on commit 968ef37

Please sign in to comment.