Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public ResumeToken build() {
private long totalNumParts;
private long numPartsCompleted;
private String uploadId;
private String objectLastModified;

public ResumeToken(PutResumeTokenBuilder builder) {
this.nativeType = S3MetaRequestOptions.MetaRequestType.PUT_OBJECT.getNativeValue();
Expand Down Expand Up @@ -122,4 +123,17 @@ public String getUploadId() {

return uploadId;
}
/******
* Download Specific fields.
******/
/**
* @return Object last modified time
*/
public String getObjectLastModifiedString() {
if (getType() != S3MetaRequestOptions.MetaRequestType.GET_OBJECT) {
throw new IllegalArgumentException("ResumeToken - Object last modified time is only defined for Get Object Resume tokens");
}

return objectLastModified;
}
}
19 changes: 19 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;

public class S3MetaRequest extends CrtResource {

Expand Down Expand Up @@ -75,6 +76,20 @@ public ResumeToken pause() {
return s3MetaRequestPause(getNativeHandle());
}

public CompletableFuture<ResumeToken> pauseAsync() {
CompletableFuture<ResumeToken> future = new CompletableFuture<>();
if (isNull()) {
throw new IllegalStateException("S3MetaRequest has been closed.");
}
try {
s3MetaRequestPauseAsync(getNativeHandle(), future);
} catch (Exception e) {
future.completeExceptionally(e);
}

return future;
}

/**
* Increment the flow-control window, so that response data continues downloading.
* <p>
Expand Down Expand Up @@ -114,5 +129,9 @@ public void incrementReadWindow(long bytes) {

private static native ResumeToken s3MetaRequestPause(long s3MetaRequest);

private static native void s3MetaRequestPauseAsync(
long s3MetaRequest,
CompletableFuture<ResumeToken> future) throws CrtRuntimeException;

private static native void s3MetaRequestIncrementReadWindow(long s3MetaRequest, long bytes);
}
3 changes: 3 additions & 0 deletions src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,9 @@ static void s_cache_s3_meta_request_resume_token(JNIEnv *env) {
s3_meta_request_resume_token_properties.upload_id_field_id =
(*env)->GetFieldID(env, cls, "uploadId", "Ljava/lang/String;");
AWS_FATAL_ASSERT(s3_meta_request_resume_token_properties.upload_id_field_id);
s3_meta_request_resume_token_properties.object_last_modified_field_id =
(*env)->GetFieldID(env, cls, "objectLastModified", "Ljava/lang/String;");
AWS_FATAL_ASSERT(s3_meta_request_resume_token_properties.object_last_modified_field_id);
}

struct java_aws_mqtt5_connack_packet_properties mqtt5_connack_packet_properties;
Expand Down
1 change: 1 addition & 0 deletions src/native/java_class_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ struct java_aws_s3_meta_request_resume_token {
jfieldID total_num_parts_field_id;
jfieldID num_parts_completed_field_id;
jfieldID upload_id_field_id;
jfieldID object_last_modified_field_id;
};
extern struct java_aws_s3_meta_request_resume_token s3_meta_request_resume_token_properties;

Expand Down
170 changes: 137 additions & 33 deletions src/native/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1210,28 +1210,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRe
aws_s3_meta_request_cancel(meta_request);
}

JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPause(
JNIEnv *env,
jclass jni_class,
jlong jni_s3_meta_request) {

(void)jni_class;
aws_cache_jni_ids(env);

struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request;
if (!meta_request) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
aws_jni_throw_illegal_argument_exception(env, "S3MetaRequest.s3MetaRequestPause: Invalid/null meta request");
return NULL;
}

struct aws_s3_meta_request_resume_token *resume_token = NULL;

if (aws_s3_meta_request_pause(meta_request, &resume_token)) {
aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to pause request");
return NULL;
}

static jobject s_java_resume_token_from_native(JNIEnv *env, struct aws_s3_meta_request_resume_token *resume_token) {
jobject resume_token_jni = NULL;
if (resume_token != NULL) {
resume_token_jni = (*env)->NewObject(
Expand All @@ -1240,15 +1219,10 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3Met
s3_meta_request_resume_token_properties.s3_meta_request_resume_token_constructor_method_id);
if ((*env)->ExceptionCheck(env) || resume_token_jni == NULL) {
aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to create ResumeToken.");
goto on_done;
goto done;
}

enum aws_s3_meta_request_type type = aws_s3_meta_request_resume_token_type(resume_token);
if (type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to convert resume token.");
goto on_done;
}

(*env)->SetIntField(env, resume_token_jni, s3_meta_request_resume_token_properties.native_type_field_id, type);
(*env)->SetLongField(
env,
Expand All @@ -1267,17 +1241,147 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3Met
aws_s3_meta_request_resume_token_num_parts_completed(resume_token));

struct aws_byte_cursor upload_id_cur = aws_s3_meta_request_resume_token_upload_id(resume_token);
jstring upload_id_jni = aws_jni_string_from_cursor(env, &upload_id_cur);
(*env)->SetObjectField(
env, resume_token_jni, s3_meta_request_resume_token_properties.upload_id_field_id, upload_id_jni);
if (upload_id_cur.len > 0) {
jstring upload_id_jni = aws_jni_string_from_cursor(env, &upload_id_cur);
(*env)->SetObjectField(
env, resume_token_jni, s3_meta_request_resume_token_properties.upload_id_field_id, upload_id_jni);

(*env)->DeleteLocalRef(env, upload_id_jni);
}
struct aws_byte_cursor object_last_modified_cur = aws_s3_meta_request_resume_object_last_modified(resume_token);
if (object_last_modified_cur.len > 0) {
jstring object_last_modified = aws_jni_string_from_cursor(env, &object_last_modified_cur);
(*env)->SetObjectField(
env,
resume_token_jni,
s3_meta_request_resume_token_properties.object_last_modified_field_id,
object_last_modified);

(*env)->DeleteLocalRef(env, object_last_modified);
}
printf("Resume token: %p\n", (void *)resume_token);
printf("Java resume token: %p\n", (void *)resume_token_jni);
printf("Resume token type: %d\n", type);
printf(
"Resume token part size: %llu\n",
(unsigned long long)aws_s3_meta_request_resume_token_part_size(resume_token));
printf(
"Resume token total num parts: %llu\n",
(unsigned long long)aws_s3_meta_request_resume_token_total_num_parts(resume_token));
printf(
"Resume token num parts completed: %llu\n",
(unsigned long long)aws_s3_meta_request_resume_token_num_parts_completed(resume_token));
}
done:
return resume_token_jni;
}

JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPause(
JNIEnv *env,
jclass jni_class,
jlong jni_s3_meta_request) {

(void)jni_class;
aws_cache_jni_ids(env);

(*env)->DeleteLocalRef(env, upload_id_jni);
struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request;
if (!meta_request) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
aws_jni_throw_illegal_argument_exception(env, "S3MetaRequest.s3MetaRequestPause: Invalid/null meta request");
return NULL;
}

struct aws_s3_meta_request_resume_token *resume_token = NULL;

if (aws_s3_meta_request_pause(meta_request, &resume_token)) {
aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to pause request");
return NULL;
}

on_done:
jobject resume_token_jni = s_java_resume_token_from_native(env, resume_token);
aws_s3_meta_request_resume_token_release(resume_token);
return resume_token_jni;
}
struct s_pause_callback_data {
JavaVM *jvm;
jobject java_pause_future;
};

static void s_s3_meta_request_pause_complete(
struct aws_s3_meta_request *meta_request,
struct aws_s3_meta_request_resume_token *resume_token,
void *user_data) {
(void)meta_request;
struct s_pause_callback_data *callback_data = (struct s_pause_callback_data *)user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm);
JavaVM *jvm = callback_data->jvm;
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}

jobject resume_token_jni = s_java_resume_token_from_native(env, resume_token);
if (resume_token_jni == NULL) {
jobject exception;
if (!aws_jni_get_and_clear_exception(env, &exception)) {
/* No exception raised from Java, create our own */
exception = aws_jni_new_crt_exception_from_error_code(env, aws_last_error());
}

(*env)->CallBooleanMethod(
env,
callback_data->java_pause_future,
completable_future_properties.complete_exceptionally_method_id,
exception);

goto done;
}

(*env)->CallBooleanMethod(
env, callback_data->java_pause_future, completable_future_properties.complete_method_id, resume_token_jni);
(*env)->DeleteLocalRef(env, resume_token_jni);

done:
(*env)->DeleteGlobalRef(env, callback_data->java_pause_future);
aws_mem_release(aws_jni_get_allocator(), callback_data);
aws_jni_release_thread_env(jvm, env);
/********** JNI ENV RELEASE **********/
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPauseAsync(
JNIEnv *env,
jclass jni_class,
jlong jni_s3_meta_request,
jobject java_pause_future) {

(void)jni_class;
aws_cache_jni_ids(env);

struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request;
if (!meta_request) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
aws_jni_throw_illegal_argument_exception(
env, "S3MetaRequest.s3MetaRequestPauseAsync: Invalid/null meta request");
return;
}

struct aws_allocator *allocator = aws_jni_get_allocator();
struct s_pause_callback_data *callback_data = aws_mem_calloc(allocator, 1, sizeof(struct s_pause_callback_data));
jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm);
AWS_FATAL_ASSERT(jvmresult == 0);

callback_data->java_pause_future = (*env)->NewGlobalRef(env, java_pause_future);
AWS_FATAL_ASSERT(callback_data->java_pause_future != NULL);

if (aws_s3_meta_request_pause_async(meta_request, s_s3_meta_request_pause_complete, callback_data)) {
printf("Failed to pause async, last error: %d\n", aws_last_error());
(*env)->DeleteGlobalRef(env, callback_data->java_pause_future);
aws_mem_release(allocator, callback_data);
aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPauseAsync: Failed to pause request");
return;
}
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestIncrementReadWindow(
JNIEnv *env,
Expand Down
14 changes: 12 additions & 2 deletions src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,24 @@ public void onFinished(S3FinishedResponseContext context) {
};

HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) };
HttpRequest httpRequest = new HttpRequest("GET", PRE_EXIST_1MB_PATH, headers, null);
HttpRequest httpRequest = new HttpRequest("GET", "/pre-existing-2GB", headers, null);

S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions()
.withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest)
.withResponseHandler(responseHandler);

try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) {
Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get());
Thread.sleep(3000); // sleep 1 sec to pause
CompletableFuture<ResumeToken> pauseFuture = metaRequest.pauseAsync();
Assert.assertNotNull(pauseFuture);
ResumeToken token = pauseFuture.get();
// ResumeToken token = metaRequest.pause();
Assert.assertNotNull(token);
System.err.println("Resume token getObjectLastModifiedString from Java: " + token.getObjectLastModifiedString());
System.err.println("Resume token part size from Java: " + token.getPartSize());
System.err.println("Resume token getNumPartsCompleted from Java: " + token.getNumPartsCompleted());
System.err.println("Resume token getTotalNumParts from Java: " + token.getTotalNumParts());
// Assert.assertEquals(Integer.valueOf(14352), onFinishedFuture.get());
}
} catch (InterruptedException | ExecutionException ex) {
Assert.fail(ex.getMessage());
Expand Down
Loading