Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Google Cloud Storage output plugin for [Embulk](https://github.com/embulk/embulk
- **json_keyfile** fullpath of json_key (string, required when auth_method is json_key)
- **application_name**: Application name, anything you like (string, optional, default value is "embulk-output-gcs")
- **max_connection_retry**: Number of connection retries to GCS (number, default value is 10)
- **delete_in_advance**: Delete Bucket/Prefix matched files in advance (boolean, default value is false)

## Example

Expand Down
138 changes: 138 additions & 0 deletions src/main/java/org/embulk/output/gcs/GcsOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package org.embulk.output.gcs;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
Expand All @@ -29,14 +33,23 @@
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.config.units.LocalFile;
import org.embulk.util.retryhelper.RetryExecutor;
import org.embulk.util.retryhelper.RetryGiveupException;
import org.embulk.util.retryhelper.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.invoke.MethodHandles;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class GcsOutputPlugin implements FileOutputPlugin
{
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder()
.addDefaultModules().build();
public static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY.createConfigMapper();
Expand Down Expand Up @@ -71,6 +84,10 @@ else if (task.getAuthMethod().getString().equals("private_key")) {
}
}

if (task.getDeleteInAdvance()) {
deleteFiles(task);
}

return resume(task.toTaskSource(), taskCount, control);
}

Expand Down Expand Up @@ -120,4 +137,125 @@ public Storage createClient(final PluginTask task)
throw new RuntimeException(ex);
}
}

public void deleteFiles(PluginTask task)
{
logger.info("Start delete files operation");
Storage client = createClient(task);
try {
List<BlobId> blobIds = listObjectsWithRetry(client, task.getBucket(), task.getPathPrefix(), task.getMaxConnectionRetry());
if (blobIds.isEmpty()) {
logger.info("no files were found");
return;
}
for (BlobId blobId : blobIds) {
deleteObjectWithRetry(client, blobId, task.getMaxConnectionRetry());
logger.info("delete file: {}/{}", blobId.getBucket(), blobId.getName());
}
}
catch (IOException ex) {
throw new ConfigException(ex);
}
}

private List<BlobId> listObjectsWithRetry(Storage client, String bucket, String prefix, int maxConnectionRetry) throws IOException
{
try {
return RetryExecutor.builder()
.withRetryLimit(maxConnectionRetry)
.withInitialRetryWaitMillis(500)
.withMaxRetryWaitMillis(30 * 1000)
.build()
.runInterruptible(new Retryable<List<BlobId>>() {
@Override
public List<BlobId> call() throws IOException
{
// https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix#storage_list_files_with_prefix-java
Page<Blob> list = client.list(bucket, Storage.BlobListOption.prefix(prefix), Storage.BlobListOption.currentDirectory());
List<BlobId> blobIds = new ArrayList<>();
list.iterateAll().forEach(x -> blobIds.add(x.getBlobId()));
return blobIds;
}

@Override
public boolean isRetryableException(Exception exception)
{
return true;
}

@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
{
String message = String.format("GCS list request failed. Retrying %d/%d after %d seconds. Message: %s: %s",
retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage());
if (retryCount % 3 == 0) {
logger.warn(message, exception);
}
else {
logger.warn(message);
}
}

@Override
public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
{
}
});
}
catch (RetryGiveupException ex) {
throw Throwables.propagate(ex.getCause());
}
catch (InterruptedException ex) {
throw new InterruptedIOException();
}
}

private Void deleteObjectWithRetry(Storage client, BlobId blobId, int maxConnectionRetry) throws IOException
{
try {
return RetryExecutor.builder()
.withRetryLimit(maxConnectionRetry)
.withInitialRetryWaitMillis(500)
.withMaxRetryWaitMillis(30 * 1000)
.build()
.runInterruptible(new Retryable<Void>() {
@Override
public Void call() throws IOException
{
client.delete(blobId);
return null;
}

@Override
public boolean isRetryableException(Exception exception)
{
return true;
}

@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
{
String message = String.format("GCS delete request failed. Retrying %d/%d after %d seconds. Message: %s: %s",
retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage());
if (retryCount % 3 == 0) {
logger.warn(message, exception);
}
else {
logger.warn(message);
}
}

@Override
public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
{
}
});
}
catch (RetryGiveupException ex) {
throw Throwables.propagate(ex.getCause());
}
catch (InterruptedException ex) {
throw new InterruptedIOException();
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/output/gcs/PluginTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ public interface PluginTask extends Task
@Config("key_pass")
@ConfigDefault("\"notasecret\"")
String getKeyPass();

@Config("delete_in_advance")
@ConfigDefault("false")
boolean getDeleteInAdvance();
}
35 changes: 35 additions & 0 deletions src/test/java/org/embulk/output/gcs/TestGcsOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -48,6 +49,7 @@
import static org.embulk.output.gcs.GcsOutputPlugin.CONFIG_MAPPER;
import static org.embulk.output.gcs.GcsOutputPlugin.CONFIG_MAPPER_FACTORY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeNotNull;
Expand Down Expand Up @@ -138,6 +140,7 @@ public void checkDefaultValues()

PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class);
assertEquals("private_key", task.getAuthMethod().toString());
assertFalse(task.getDeleteInAdvance());
}

// p12_keyfile is null when auth_method is private_key
Expand Down Expand Up @@ -302,6 +305,26 @@ public void testGenerateRemotePath() throws Exception
assertEquals("sample.000.01.csv", fileOutput.generateRemotePath("......///sample", task.getSequenceFormat(), 0, 1, ".csv"));
}

@Test
public void testDeleteFilesInAdvanceSuccessfully() throws Exception
{
ConfigSource configSource = config();
PluginTask task = CONFIG_MAPPER.map(configSource, PluginTask.class);
Storage client = plugin.createClient(task);

// Even if a file exists, it will be a success.
uploadEmptyFile(client, task.getBucket(), task.getPathPrefix() + ".001.csv");
uploadEmptyFile(client, task.getBucket(), task.getPathPrefix() + ".002.csv");

assertTrue(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".001.csv"));
assertTrue(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".002.csv"));

plugin.deleteFiles(task);

assertFalse(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".001.csv"));
assertFalse(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".002.csv"));
}

public ConfigSource config()
{
byte[] keyBytes = Base64.getDecoder().decode(GCP_P12_KEYFILE.get());
Expand Down Expand Up @@ -409,4 +432,16 @@ private byte[] convertInputStreamToByte(InputStream is) throws IOException
}
return bo.toByteArray();
}

private static void uploadEmptyFile(Storage client, String gcsBucket, String gcsPath) throws IOException
{
BlobId blobId = BlobId.of(gcsBucket, gcsPath);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
client.create(blobInfo, new byte[0]);
}

private static boolean isFileExist(Storage client, String gcsBucket, String gcsPath)
{
return client.get(BlobId.of(gcsBucket, gcsPath)) != null;
}
}