Skip to content

S3TransferManager results in error when running in AWS but works remotely #644

@rickyeng127

Description

@rickyeng127

Describe the bug

We are attempting to use the S3TransferManager to stream data to an S3 bucket. The code works locally when connecting to AWS remotely, however fails when running from AWS.

Expected Behavior

We expect to see the file created in the S3 bucket.

Current Behavior

The code generates this trace and error:

[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: detected more scheduled tasks with the next occurring at 0, using timeout of 0.
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: waiting for a maximum of 0 ms
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: wake up with 0 events to process.
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: running scheduled tasks.
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [task-scheduler] - id=0x7fdc0419e908: Running aws_exponential_backoff_retry_task task with <Running> status
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [exp-backoff-strategy] - id=0x7fdbe4509160: Vending retry_token 0x7fdc0419e8a0
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - id=0x7fdbe4508fd0: token acquired callback invoked with error Success. with token 0x7fdbfc00c8f0 and nested token 0x7fdc0419e8a0
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - id=0x7fdbe4508fd0: invoking on_retry_token_acquired callback
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Acquire connection
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: snapshot - state=1, idle_connection_count=0, pending_acquire_count=1, pending_settings_count=0, pending_connect_count=1, vended_connection_count=0, open_connection_count=0, ref_count=1
[INFO] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Requesting 1 new connections from http
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [http-connection] - https_proxy environment found
[ERROR] [2023-07-05T18:11:01Z] [00007fdbc6242700] [http-connection] - Could not parse found proxy URI.
[ERROR] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: http connection creation failed with error code 36(An input string was passed to a parser and the string was incorrectly formatted.)
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Failing excess connection acquisition with error code 36
[WARN] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Failed to complete connection acquisition with error_code 36(An input string was passed to a parser and the string was incorrectly formatted.)
[ERROR] [2023-07-05T18:11:01Z] [00007fdbc6242700] [S3Endpoint] - id=0x7fdbe4592e80: Could not acquire connection due to error code 36 (An input string was passed to a parser and the string was incorrectly formatted.)
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [S3Client] - id=0x7fdbe4401fa0 Client scheduling retry of request 0x7fdc0419e130 for meta request 0x7fdbe45779f0 with token 0x7fdbfc00c8f0 with error code 36 (An input string was passed to a parser and the string was incorrectly formatted.).
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - token_id=0x7fdbfc00c8f0: reducing retry capacity by 10 from 500 and scheduling retry.
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [exp-backoff-strategy] - id=0x7fdbe4509160: Attempting retry on token 0x7fdc0419e8a0 with error type 0
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [exp-backoff-strategy] - id=0x7fdbe4509160: Computed backoff value of 8310907ns on token 0x7fdc0419e8a0
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: scheduling task 0x7fdc0419e908 in-thread for timestamp 977992596685
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [task-scheduler] - id=0x7fdc0419e908: Scheduling aws_exponential_backoff_retry_task task for future execution at time 977992596685
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - id=0x7fdbe4508fd0: on_retry_token_acquired callback completed

Reproduction Steps

public class AWSTransferManager {
  private static final Logger log = LogManager.getLogger(AWSTransferManager.class.getName());

  static final String BUCKET="MYBUCKET";
  static final String FILE ="flux.bin";
  static final String testString = "This is some test data that will get sent to S3 over and over and " +
    "over again and is about 128 chars long, no seriously... really";
  static final int numLinesToSend=1_000;
  static final byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);

  public static void main (String[] args) throws IOException, InterruptedException {
    doIt();
  }

  public static void doIt() throws InterruptedException {

    log.info("http_proxy: " + System.getProperty("http_proxy")); // returns null
    log.info("https_proxy: " + System.getProperty("https_proxy")); // returns null
    log.info("http.proxyHost: " + System.getProperty("http.proxyHost")); // returns null
    log.info("https.proxyHost: " + System.getProperty("https.proxyHost")); // returns null

    String logFile = System.getProperty("user.dir")+"/s3.log";
    Utils.initCrtLogging(logFile);
    //Use default Creds:
    AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
    //or Select an Env Profile
    //AwsCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(PROFILE);
    //Or use SSO
    //AwsCredentialsProvider credentialsProvider = Utils.getSsoCredentials(PROFILE);

    S3AsyncClient s3AsyncClient = Utils.getS3AsyncClient(credentialsProvider);
    S3TransferManager transferManager = Utils.getTransferManager(s3AsyncClient);

    //Create a feed we can 'stream' messages to:
    ConsumerFeed<byte[]> consumerFeed = new ConsumerFeed<>();
    Flux<ByteBuffer> flux = consumerFeed.flux
      .map(ByteBuffer::wrap)
      .doOnComplete(() -> log.info("Done"));

    UploadRequest.Builder req = UploadRequest.builder()
      .requestBody(AsyncRequestBody.fromPublisher(flux))
      .addTransferListener(new S3TransferListener(FILE))
      //Fix for Error we get without contentLength:
      //https://github.com/awslabs/aws-c-s3/pull/285/files#diff-5ee13dd0ee005b828bde6d60ff89582070983e3cb7e5d92389b629a9850c27f8
      .putObjectRequest(b -> b.bucket(BUCKET).key(FILE)); //.contentLength(maxSize));

    Upload upload = transferManager.upload(req.build());
    //async stuff - wait for it.
    consumerFeed.subscription.await();

    //Send some stuff:
    Consumer<byte[]> dataFeed = consumerFeed.getDataFeed();
    for (int i=0; i < numLinesToSend; i++) {
      dataFeed.accept(testBytes);
    }
    long completionSentTime= System.currentTimeMillis();
    consumerFeed.complete();

    CompletedUpload completedUpload = upload.completionFuture().join();
    double responseTime = (System.currentTimeMillis() - completionSentTime)/1000D;
    log.info("CompleteUploadResponse: {}\n\tCloseDuration: {}",
      completedUpload.response(), String.format("%.3f seconds", responseTime));

    // Validation:
    Thread.sleep(2000L);
    log.info("Getting File");
    Path filePath = Paths.get(System.getProperty("user.dir")+"/"+FILE);

    try {
      if (Files.exists(filePath)) Files.delete(filePath);
      GetObjectResponse response = s3AsyncClient.getObject(o -> o.bucket(BUCKET).key(FILE), AsyncResponseTransformer.toFile(filePath)).join();
      log.info("Resulting File Size: {}", Files.size(filePath));
    }
    catch (IOException e) {
      throw new RuntimeException(e);
    }
  }
}

public class ConsumerFeed<T> {
  Consumer<T> dataFeed;
  FluxSink<T> sink;
  final CountDownLatch subscription = new CountDownLatch(1);
  Flux<T> flux = Flux.create(sink -> {
    System.out.println("Create");
    this.setSink(sink);
    setDataFeed(sink::next);
    subscription.countDown();
  });
  void complete() {
    getSink().complete();
  }

  public Consumer<T> getDataFeed() {
    return dataFeed;
  }

  public void setDataFeed(Consumer<T> dataFeed) {
    this.dataFeed = dataFeed;
  }

  public FluxSink<T> getSink() {
    return sink;
  }

  public void setSink(FluxSink<T> sink) {
    this.sink = sink;
  }
}

public class S3TransferListener implements TransferListener {

  private static final Logger log = LogManager.getLogger(S3TransferListener.class.getName());

  public S3TransferListener(String resource) {
    this.resource = resource;
  }

  final String resource;
  long startTime;
  private int step=0;
  @Override
  public void transferInitiated (Context.TransferInitiated context) {
    log.info("Transfer initiated: {}, {}", resource, context.progressSnapshot().ratioTransferred());
    startTime = System.currentTimeMillis();
    status(context.progressSnapshot().transferredBytes());
  }

  private void status(long l) {
    if (l > step * 1_000_000) {
      log.info("Bytes transferred {}", l);
      step++;
    }
  }

  @Override
  public void bytesTransferred (Context.BytesTransferred context) {
    status(context.progressSnapshot().transferredBytes());
  }

  @Override
  public void transferComplete (Context.TransferComplete context) {
    long seconds = (System.currentTimeMillis() - startTime) / 1000;
    double bytes = (double)context.progressSnapshot().transferredBytes();
    double megabytes = bytes / 1_048_576;
    double throughput = megabytes / seconds;
    log.info("Transfer complete for resource: {}\n\t Bytes: {}\n\t MBs: {}\n\tThroughput: {} MB/s",
      resource, String.format("%10f", bytes), String.format("%.3f", megabytes),
      String.format("%.2f", throughput));
  }

  @Override
  public void transferFailed (Context.TransferFailed context) {
    log.error("Transfer failed on resource "+resource, context.exception());
  }
}


public class Utils {

  private static final Logger log = LogManager.getLogger(Utils.class.getName());


  static Path getPath (String resource) {
    Path path = Paths.get(resource);
    if (!path.getParent().toFile().exists() && !path.getParent().toFile().mkdirs()) {
      throw new RuntimeException("Failed to create path location: " + path);
    }
    return path;
  }

  static void initCrtLogging(String path) {
    //Since this method uses native aws libs (https://github.com/awslabs/aws-c-s3)
    // Only decent logging can be found by enabling this.
    /*Path logFilePath = getPath(path);
    if (Files.exists(logFilePath)) {
      try {
        Files.delete(logFilePath);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }*/
    Log.initLoggingToStdout(Log.LogLevel.Trace);
    log.info("CRT Logging initialized: "+path);
  }

  static S3TransferManager getTransferManager(S3AsyncClient s3AsyncClient) {
    return S3TransferManager.builder()
      .s3Client(s3AsyncClient)
      .build();
  }

  public static S3AsyncClient getS3AsyncClient(AwsCredentialsProvider credentialsProvider) {
    S3AsyncClient s3AsyncClient =
      S3AsyncClient.crtBuilder()
        //.credentialsProvider(credentialsProvider)
        .region(Region.US_EAST_1)
        .targetThroughputInGbps(20.0)
        .minimumPartSizeInBytes(1000000L)
        .build();
    return s3AsyncClient;
  }
}

Possible Solution

No response

Additional Information/Context

Using AWS SDK version 2.20.94

aws-crt-java version used

0.22.2

Java version used

1.8

Operating System and version

Windows 10 Enterprise

Metadata

Metadata

Assignees

No one assigned

    Labels

    help-wantedWe are asking the community to submit a PR to resolve this issue.p3This is a minor priority issueresponse-requestedWaiting on additional info and feedback. Will move to 'closing-soon' in 7 days.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions