Skip to content

Feature/jdbc log s3 upload #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ Snowflake output plugin for Embulk loads records to Snowflake.
- **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values)
- **batch_size**: size of a single batch insert (integer, default: 16777216)
- **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none")
- **upload_jdbc_log_to_s3**: enable automatic upload of JDBC driver logs to S3 when communication errors occur (boolean, default: false)
- **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true)
- **s3_prefix**: S3 key prefix for JDBC log upload (string, optional - if empty, files are uploaded to bucket root)
- **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true)
- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses default AWS credentials provider chain if not specified)
- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses default AWS credentials provider chain if not specified)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ref: https://github.com/embulk/embulk-output-s3

(nits) s3_prefix could be s3_path_prefix like embulk-output-s3

- **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`)
- **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column.
- **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp)
Expand Down Expand Up @@ -62,6 +68,50 @@ Snowflake output plugin for Embulk loads records to Snowflake.
* Transactional: Yes.
* Resumable: No.

## JDBC Log Upload to S3

This plugin supports automatic upload of JDBC driver logs to S3 when communication errors occur. This feature is useful for debugging connection issues with Snowflake.

### Configuration Example

```yaml
out:
type: snowflake
host: your-account.snowflakecomputing.com
user: your_user
password: your_password
warehouse: your_warehouse
database: your_database
schema: your_schema
table: your_table
mode: insert

# JDBC log upload configuration
upload_jdbc_log_to_s3: true
s3_bucket: your-log-bucket
s3_prefix: snowflake-jdbc-logs # Optional: omit to upload to bucket root
s3_region: us-east-1

# Optional: Explicit AWS credentials (uses IAM role if not specified)
s3_access_key_id: YOUR_ACCESS_KEY_ID
s3_secret_access_key: YOUR_SECRET_ACCESS_KEY
```

### Authentication Methods

1. **IAM Role (Recommended)**: Leave `s3_access_key_id` and `s3_secret_access_key` unspecified. The plugin will use the default AWS credentials provider chain (IAM role, environment variables, etc.).

2. **Explicit Credentials**: Specify both `s3_access_key_id` and `s3_secret_access_key` for explicit authentication.

### Behavior

- JDBC logs are only uploaded when communication errors occur during Snowflake operations
- The plugin automatically finds the latest `snowflake_jdbc*.log` file in the system temp directory
- **Automatic timestamping**: Upload timestamp is automatically added to the filename (format: `yyyyMMdd_HHmmss`)
- Example: `snowflake_jdbc0.log.0` → `snowflake_jdbc0.log_20250630_021500.0`
- If S3 upload fails, a warning is logged but the original error is still thrown
- If required S3 configuration is missing, a warning is logged and log upload is skipped

## Build

```
Expand Down
31 changes: 31 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,37 @@ dependencies {

compile "org.embulk:embulk-output-jdbc:0.10.2"
compile "net.snowflake:snowflake-jdbc:3.13.26"

compile platform("software.amazon.awssdk:bom:2.25.20")

compile("software.amazon.awssdk:s3") {
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "joda-time", module: "joda-time"
exclude group: "commons-logging", module: "commons-logging"
exclude group: "org.slf4j", module: "slf4j-api"
}

compile("software.amazon.awssdk:regions") {
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "joda-time", module: "joda-time"
exclude group: "commons-logging", module: "commons-logging"
exclude group: "org.slf4j", module: "slf4j-api"
}

compile("software.amazon.awssdk:auth") {
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "joda-time", module: "joda-time"
exclude group: "commons-logging", module: "commons-logging"
exclude group: "org.slf4j", module: "slf4j-api"
}

compile "commons-logging:commons-logging:1.2"
}
embulkPlugin {
mainClass = "org.embulk.output.SnowflakeOutputPlugin"
Expand Down
42 changes: 42 additions & 0 deletions gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,53 @@ com.fasterxml.jackson.core:jackson-annotations:2.6.7
com.fasterxml.jackson.core:jackson-core:2.6.7
com.fasterxml.jackson.core:jackson-databind:2.6.7
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
commons-codec:commons-codec:1.15
commons-logging:commons-logging:1.2
io.netty:netty-buffer:4.1.108.Final
io.netty:netty-codec-http2:4.1.108.Final
io.netty:netty-codec-http:4.1.108.Final
io.netty:netty-codec:4.1.108.Final
io.netty:netty-common:4.1.108.Final
io.netty:netty-handler:4.1.108.Final
io.netty:netty-resolver:4.1.108.Final
io.netty:netty-transport-classes-epoll:4.1.108.Final
io.netty:netty-transport-native-unix-common:4.1.108.Final
io.netty:netty-transport:4.1.108.Final
javax.validation:validation-api:1.1.0.Final
net.snowflake:snowflake-jdbc:3.13.26
org.apache.httpcomponents:httpclient:4.5.13
org.apache.httpcomponents:httpcore:4.4.13
org.embulk:embulk-output-jdbc:0.10.2
org.embulk:embulk-util-config:0.3.0
org.embulk:embulk-util-json:0.1.1
org.embulk:embulk-util-retryhelper:0.8.2
org.embulk:embulk-util-rubytime:0.3.2
org.embulk:embulk-util-timestamp:0.2.1
org.reactivestreams:reactive-streams:1.0.4
software.amazon.awssdk:annotations:2.25.20
software.amazon.awssdk:apache-client:2.25.20
software.amazon.awssdk:arns:2.25.20
software.amazon.awssdk:auth:2.25.20
software.amazon.awssdk:aws-core:2.25.20
software.amazon.awssdk:aws-query-protocol:2.25.20
software.amazon.awssdk:aws-xml-protocol:2.25.20
software.amazon.awssdk:checksums-spi:2.25.20
software.amazon.awssdk:checksums:2.25.20
software.amazon.awssdk:crt-core:2.25.20
software.amazon.awssdk:endpoints-spi:2.25.20
software.amazon.awssdk:http-auth-aws:2.25.20
software.amazon.awssdk:http-auth-spi:2.25.20
software.amazon.awssdk:http-auth:2.25.20
software.amazon.awssdk:http-client-spi:2.25.20
software.amazon.awssdk:identity-spi:2.25.20
software.amazon.awssdk:json-utils:2.25.20
software.amazon.awssdk:metrics-spi:2.25.20
software.amazon.awssdk:netty-nio-client:2.25.20
software.amazon.awssdk:profiles:2.25.20
software.amazon.awssdk:protocol-core:2.25.20
software.amazon.awssdk:regions:2.25.20
software.amazon.awssdk:s3:2.25.20
software.amazon.awssdk:sdk-core:2.25.20
software.amazon.awssdk:third-party-jackson-core:2.25.20
software.amazon.awssdk:utils:2.25.20
software.amazon.eventstream:eventstream:1.0.1
75 changes: 75 additions & 0 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Types;
Expand All @@ -15,6 +16,7 @@
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskSource;
import org.embulk.output.jdbc.*;
import org.embulk.output.s3.JdbcLogUploader;
import org.embulk.output.snowflake.PrivateKeyReader;
import org.embulk.output.snowflake.SnowflakeCopyBatchInsert;
import org.embulk.output.snowflake.SnowflakeOutputConnection;
Expand Down Expand Up @@ -91,6 +93,30 @@ public interface SnowflakePluginTask extends PluginTask {
@ConfigDefault("\"none\"")
public MatchByColumnName getMatchByColumnName();

@Config("upload_jdbc_log_to_s3")
@ConfigDefault("false")
public boolean getUploadJdbcLogToS3();

@Config("s3_bucket")
@ConfigDefault("null")
public Optional<String> getS3Bucket();

@Config("s3_prefix")
@ConfigDefault("null")
public Optional<String> getS3Prefix();

@Config("s3_region")
@ConfigDefault("null")
public Optional<String> getS3Region();

@Config("s3_access_key_id")
@ConfigDefault("null")
public Optional<String> getS3AccessKeyId();

@Config("s3_secret_access_key")
@ConfigDefault("null")
public Optional<String> getS3SecretAccessKey();

public void setCopyIntoTableColumnNames(String[] columnNames);

public String[] getCopyIntoTableColumnNames();
Expand Down Expand Up @@ -139,6 +165,9 @@ public static MatchByColumnName fromString(String value) {
private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115;
private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195;

private static final String ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE =
"JDBC driver encountered communication error";

@Override
protected Class<? extends PluginTask> getTaskClass() {
return SnowflakePluginTask.class;
Expand Down Expand Up @@ -198,6 +227,10 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
props.setProperty("CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX", "true");
props.setProperty("MULTI_STATEMENT_COUNT", "0");

if (t.getUploadJdbcLogToS3()) {
props.setProperty("tracing", "ALL");
}

props.putAll(t.getOptions());

logConnectionProperties(url, props);
Expand All @@ -217,11 +250,53 @@ public ConfigDiff transaction(
try {
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
snowflakeCon.runCreateStage(stageIdentifier);

configDiff = super.transaction(config, schema, taskCount, control);
if (t.getDeleteStage()) {
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
}
} catch (Exception e) {
if (e instanceof SQLException) {
String message = e.getMessage();
if (message != null
&& message.contains(ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE)
Comment on lines +259 to +262
Copy link
Preview

Copilot AI Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Relying on message text can be brittle; consider checking the SQL error code or vendor-specific code instead of string matching to detect communication errors more reliably.

Suggested change
if (e instanceof SQLException) {
String message = e.getMessage();
if (message != null
&& message.contains(ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE)
if (e instanceof SnowflakeSQLException) {
SnowflakeSQLException snowflakeException = (SnowflakeSQLException) e;
if (snowflakeException.getErrorCode() == ENCOUNTERED_COMMUNICATION_ERROR_CODE

Copilot uses AI. Check for mistakes.

&& t.getUploadJdbcLogToS3() == true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.getUploadJdbcLogToS3() == true can be t.getUploadJdbcLogToS3() (without == true)

final Optional<String> s3Bucket = t.getS3Bucket();
final Optional<String> s3Prefix = t.getS3Prefix();
final Optional<String> s3Region = t.getS3Region();
final Optional<String> s3AccessKeyId = t.getS3AccessKeyId();
final Optional<String> s3SecretAccessKey = t.getS3SecretAccessKey();
if (!s3Bucket.isPresent() || !s3Region.isPresent()) {
logger.warn("s3_bucket, and s3_region must be set when upload_jdbc_log_to_s3 is true");
} else if (s3AccessKeyId.isPresent() != s3SecretAccessKey.isPresent()) {
logger.warn(
"Both s3_access_key_id and s3_secret_access_key must be set together or omitted.");
} else {
try (JdbcLogUploader jdbcLogUploader =
new JdbcLogUploader(
s3Bucket.get(),
s3Prefix.orElse(""),
s3Region.get(),
s3AccessKeyId.orElse(null),
s3SecretAccessKey.orElse(null))) {
String tmpDir = System.getProperty("java.io.tmpdir", "/tmp");
File logDir = new File(tmpDir);
File[] logFiles = logDir.listFiles((dir, name) -> name.startsWith("snowflake_jdbc"));
if (logFiles != null && logFiles.length > 0) {
Optional<File> latest =
Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified));
if (latest.isPresent()) {
jdbcLogUploader.uploadIfExists(latest.get());
}
} else {
logger.warn("No snowflake_jdbc*.log file found in {} for upload", tmpDir);
}
} catch (Exception uploadException) {
logger.warn("Failed to upload JDBC log to S3: {}", uploadException.getMessage());
}
}
}
}
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
try {
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/org/embulk/output/s3/JdbcLogUploader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.embulk.output.s3;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class JdbcLogUploader implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(JdbcLogUploader.class);

private final S3Client s3Client;
private final String bucket;
private final String prefix;
private final Region region;

public JdbcLogUploader(
String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) {
this.bucket = bucket;
this.prefix = prefix;
this.region = Region.of(region);

if (accessKeyId != null && secretAccessKey != null) {
// Use explicit credentials
AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
this.s3Client =
S3Client.builder()
.region(this.region)
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
.build();
} else {
// Use default credentials provider (IAM role, environment variables, etc.)
this.s3Client =
S3Client.builder()
.region(this.region)
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
}
}

public void uploadIfExists(File file) {
if (!file.exists()) {
logger.warn("File not found: {}", file.getAbsolutePath());
return;
}

// Add timestamp to filename
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
String originalFileName = file.getName();
String fileNameWithTimestamp;

// Insert timestamp before file extension
int lastDotIndex = originalFileName.lastIndexOf('.');
if (lastDotIndex > 0) {
String nameWithoutExt = originalFileName.substring(0, lastDotIndex);
String extension = originalFileName.substring(lastDotIndex);
fileNameWithTimestamp = nameWithoutExt + "_" + timestamp + extension;
} else {
fileNameWithTimestamp = originalFileName + "_" + timestamp;
}

String key = prefix.isEmpty() ? fileNameWithTimestamp : prefix + "/" + fileNameWithTimestamp;
try {
PutObjectRequest putObjectRequest =
PutObjectRequest.builder().bucket(bucket).key(key).build();

s3Client.putObject(putObjectRequest, RequestBody.fromFile(file));
logger.info("Uploaded {}", file.getAbsolutePath());
} catch (Exception e) {
logger.error("Failed to upload {}", file.getAbsolutePath(), e);
}
}

@Override
public void close() {
if (s3Client != null) {
s3Client.close();
}
}
}
Loading