From b85b4fca6bb6dbf2f4ee3b5d22564d0108be9e8a Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 5 Aug 2024 20:08:59 -0400 Subject: [PATCH] feat: allow specifying an expected object size for resumable operations. Update resumable upload failure detection to be more specific about classifying a request as SCENARIO_5 Fixes #2511 --- .../storage/BidiBlobWriteSessionConfig.java | 2 +- .../DefaultBlobWriteSessionConfig.java | 2 +- ...apicBidiUnbufferedWritableByteChannel.java | 16 +- ...edChunkedResumableWritableByteChannel.java | 18 +- .../storage/GapicUploadSessionBuilder.java | 12 +- .../google/cloud/storage/GrpcStorageImpl.java | 16 +- .../cloud/storage/GrpcStorageOptions.java | 3 +- .../JournalingBlobWriteSessionConfig.java | 2 +- .../storage/JsonResumableSessionPutTask.java | 3 +- ...lelCompositeUploadWritableByteChannel.java | 4 +- .../com/google/cloud/storage/Storage.java | 17 ++ .../com/google/cloud/storage/UnifiedOpts.java | 34 +++ .../cloud/storage/spi/v1/HttpStorageRpc.java | 4 + .../cloud/storage/spi/v1/StorageRpc.java | 3 +- .../GapicUploadSessionBuilderSyntaxTest.java | 9 +- ...fferedWritableByteChannelPropertyTest.java | 4 +- .../ITUnbufferedResumableUploadTest.java | 4 +- .../storage/it/ITResumableUploadTest.java | 229 ++++++++++++++++++ .../it/runner/registry/BackendResources.java | 1 + 19 files changed, 350 insertions(+), 33 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITResumableUploadTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java index 7a80fed498..e8ba2f3c61 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java @@ -114,7 +114,7 @@ public WritableByteChannelSession writeSession( BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts); ApiFuture startResumableWrite = - grpc.startResumableWrite(grpcCallContext, req); + grpc.startResumableWrite(grpcCallContext, req, opts); return ResumableMedia.gapic() .write() .bidiByteChannel(grpc.storageClient.bidiWriteObjectCallable()) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 5a7ef18198..9ab421bd56 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -156,7 +156,7 @@ public WritableByteChannelSession writeSession( WriteObjectRequest req = grpc.getWriteObjectRequest(info, opts); ApiFuture startResumableWrite = - grpc.startResumableWrite(grpcCallContext, req); + grpc.startResumableWrite(grpcCallContext, req, opts); return ResumableMedia.gapic() .write() .byteChannel( diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java index 15278616d9..c3abe7b066 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java @@ -24,6 +24,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.OutOfRangeException; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.Conversions.Decoder; @@ -345,10 +346,17 @@ public void onNext(BidiWriteObjectResponse value) { public void onError(Throwable t) { if (t instanceof OutOfRangeException) { OutOfRangeException oore = (OutOfRangeException) t; - clientDetectedError( - ResumableSessionFailureScenario.SCENARIO_5.toStorageException( - ImmutableList.of(lastWrittenRequest), null, context, oore)); - } else if (t instanceof ApiException) { + ErrorDetails ed = oore.getErrorDetails(); + if (!(ed != null + && ed.getErrorInfo() != null + && ed.getErrorInfo().getReason().equals("GRPC_MISMATCHED_UPLOAD_SIZE"))) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_5.toStorageException( + ImmutableList.of(lastWrittenRequest), null, context, oore)); + return; + } + } + if (t instanceof ApiException) { // use StorageExceptions logic to translate from ApiException to our status codes ensuring // things fall in line with our retry handlers. // This is suboptimal, as it will initialize a second exception, however this is the diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java index 6acac1e617..e7697ed7b5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java @@ -24,6 +24,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.OutOfRangeException; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.Conversions.Decoder; @@ -267,11 +268,18 @@ public void onError(Throwable t) { if (t instanceof OutOfRangeException) { OutOfRangeException oore = (OutOfRangeException) t; open = false; - StorageException storageException = - ResumableSessionFailureScenario.SCENARIO_5.toStorageException( - segments, null, context, oore); - invocationHandle.setException(storageException); - } else if (t instanceof ApiException) { + ErrorDetails ed = oore.getErrorDetails(); + if (!(ed != null + && ed.getErrorInfo() != null + && ed.getErrorInfo().getReason().equals("GRPC_MISMATCHED_UPLOAD_SIZE"))) { + StorageException storageException = + ResumableSessionFailureScenario.SCENARIO_5.toStorageException( + segments, null, context, oore); + invocationHandle.setException(storageException); + return; + } + } + if (t instanceof ApiException) { // use StorageExceptions logic to translate from ApiException to our status codes ensuring // things fall in line with our retry handlers. // This is suboptimal, as it will initialize a second exception, however this is the diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java index 2d1daa5444..3ae9b8bc23 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java @@ -21,6 +21,8 @@ import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.util.concurrent.MoreExecutors; import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.BidiWriteObjectResponse; @@ -50,7 +52,8 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel( ApiFuture resumableWrite( UnaryCallable callable, - WriteObjectRequest writeObjectRequest) { + WriteObjectRequest writeObjectRequest, + Opts opts) { StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder(); if (writeObjectRequest.hasWriteObjectSpec()) { b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec()); @@ -61,7 +64,7 @@ ApiFuture resumableWrite( if (writeObjectRequest.hasObjectChecksums()) { b.setObjectChecksums(writeObjectRequest.getObjectChecksums()); } - StartResumableWriteRequest req = b.build(); + StartResumableWriteRequest req = opts.startResumableWriteRequest().apply(b).build(); Function f = uploadId -> writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build(); @@ -80,7 +83,8 @@ ApiFuture resumableWrite( ApiFuture bidiResumableWrite( UnaryCallable x, - BidiWriteObjectRequest writeObjectRequest) { + BidiWriteObjectRequest writeObjectRequest, + Opts opts) { StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder(); if (writeObjectRequest.hasWriteObjectSpec()) { b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec()); @@ -91,7 +95,7 @@ ApiFuture bidiResumableWrite( if (writeObjectRequest.hasObjectChecksums()) { b.setObjectChecksums(writeObjectRequest.getObjectChecksums()); } - StartResumableWriteRequest req = b.build(); + StartResumableWriteRequest req = opts.startResumableWriteRequest().apply(b).build(); Function f = uploadId -> writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 0d0904ca1c..022a3cc1e6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -320,7 +320,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o ClientStreamingCallable write = storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); - ApiFuture start = startResumableWrite(grpcCallContext, req); + ApiFuture start = startResumableWrite(grpcCallContext, req, opts); ApiFuture session2 = ApiFutures.transform( start, @@ -365,7 +365,7 @@ public Blob createFrom( opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - ApiFuture start = startResumableWrite(grpcCallContext, req); + ApiFuture start = startResumableWrite(grpcCallContext, req, opts); BufferedWritableByteChannelSession session = ResumableMedia.gapic() @@ -790,7 +790,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options // in JSON, the starting of the resumable session happens before the invocation of write can // happen. Emulate the same thing here. // 1. create the future - ApiFuture startResumableWrite = startResumableWrite(grpcCallContext, req); + ApiFuture startResumableWrite = startResumableWrite(grpcCallContext, req, opts); // 2. await the result of the future ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite); // 3. wrap the result in another future container before constructing the BlobWriteChannel @@ -1919,7 +1919,7 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( @VisibleForTesting ApiFuture startResumableWrite( - GrpcCallContext grpcCallContext, WriteObjectRequest req) { + GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts opts) { Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); return ResumableMedia.gapic() @@ -1928,11 +1928,12 @@ ApiFuture startResumableWrite( storageClient .startResumableWriteCallable() .withDefaultCallContext(merge.withRetryableCodes(codes)), - req); + req, + opts); } ApiFuture startResumableWrite( - GrpcCallContext grpcCallContext, BidiWriteObjectRequest req) { + GrpcCallContext grpcCallContext, BidiWriteObjectRequest req, Opts opts) { Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); return ResumableMedia.gapic() @@ -1941,7 +1942,8 @@ ApiFuture startResumableWrite( storageClient .startResumableWriteCallable() .withDefaultCallContext(merge.withRetryableCodes(codes)), - req); + req, + opts); } private SourceObject sourceObjectEncode(SourceBlob from) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 64376cc107..152b43d44f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -279,7 +279,8 @@ private Tuple> resolveSettingsAndOpts() throw InstantiatingGrpcChannelProvider.newBuilder() .setEndpoint(endpoint) .setAllowNonDefaultServiceAccount(true) - .setAttemptDirectPath(attemptDirectPath); + .setAttemptDirectPath(attemptDirectPath) + .setMaxInboundMetadataSize(50 * 1024); if (!NoopGrpcInterceptorProvider.INSTANCE.equals(grpcInterceptorProvider)) { channelProviderBuilder.setInterceptorProvider(grpcInterceptorProvider); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java index 7d0e46ce5b..499b9dce9e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java @@ -190,7 +190,7 @@ public WritableByteChannelSession writeSession( opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); ApiFuture f = grpcStorage.startResumableWrite( - grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts)); + grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts); ApiFuture> start = ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java index 9d3afa5a83..149c7ad9af 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java @@ -205,7 +205,8 @@ public void rewindTo(long offset) { && contentLength != null && contentLength > 0) { String errorMessage = cause.getContent().toLowerCase(Locale.US); - if (errorMessage.contains("content-range")) { + if (errorMessage.contains("content-range") + && !errorMessage.contains("earlier")) { // TODO: exclude "earlier request" StorageException se = ResumableSessionFailureScenario.SCENARIO_5.toStorageException( uploadId, response, cause, cause::getContent); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index 58e9bad03d..db807ce6ce 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -43,6 +43,7 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.UnifiedOpts.ResumableUploadExpectedObjectSize; import com.google.cloud.storage.UnifiedOpts.SourceGenerationMatch; import com.google.cloud.storage.UnifiedOpts.SourceGenerationNotMatch; import com.google.cloud.storage.UnifiedOpts.SourceMetagenerationMatch; @@ -102,7 +103,8 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab || o instanceof SourceMetagenerationMatch || o instanceof SourceMetagenerationNotMatch || o instanceof Crc32cMatch - || o instanceof Md5Match; + || o instanceof Md5Match + || o instanceof ResumableUploadExpectedObjectSize; TO_EXCLUDE_FROM_PARTS = tmp.negate(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java index fca1d1294d..0ffeda2a14 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java @@ -1273,6 +1273,23 @@ public static BlobWriteOption detectContentType() { return new BlobWriteOption(UnifiedOpts.detectContentType()); } + /** + * Set a precondition on the number of bytes that GCS should expect for a resumable upload. See + * the docs for X-Upload-Content-Length + * for more detail. + * + *

If the method invoked with this option does not perform a resumable upload, this option + * will be ignored. + * + * @since 2.42.0 + */ + @BetaApi + @TransportCompatibility({Transport.HTTP, Transport.GRPC}) + public static BlobWriteOption expectedObjectSize(long objectContentSize) { + return new BlobWriteOption(UnifiedOpts.resumableUploadExpectedObjectSize(objectContentSize)); + } + /** * Deduplicate any options which are the same parameter. The value which comes last in {@code * os} will be the value included in the return. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java index 16c76ef926..19137fecd5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java @@ -55,6 +55,7 @@ import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.RestoreObjectRequest; import com.google.storage.v2.RewriteObjectRequest; +import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.UpdateBucketRequest; import com.google.storage.v2.UpdateHmacKeyRequest; import com.google.storage.v2.UpdateObjectRequest; @@ -196,6 +197,10 @@ default Mapper composeObject() { default Mapper rewriteObject() { return Mapper.identity(); } + + default Mapper startResumableWrite() { + return Mapper.identity(); + } } /** Base interface for those Opts which are applicable to Bucket List operations */ @@ -487,6 +492,12 @@ static Projection projection(@NonNull String projection) { return new Projection(projection); } + static ResumableUploadExpectedObjectSize resumableUploadExpectedObjectSize( + long expectedObjectSize) { + checkArgument(expectedObjectSize >= 0, "expectedObjectSize >= 0 (%s >= 0)", expectedObjectSize); + return new ResumableUploadExpectedObjectSize(expectedObjectSize); + } + static SoftDeleted softDeleted(boolean softDeleted) { return new SoftDeleted(softDeleted); } @@ -1832,6 +1843,25 @@ public Mapper updateObject() { } } + static final class ResumableUploadExpectedObjectSize extends RpcOptVal<@NonNull Long> + implements ObjectTargetOpt { + private static final long serialVersionUID = 3640126281492196357L; + + private ResumableUploadExpectedObjectSize(@NonNull Long val) { + super(StorageRpc.Option.X_UPLOAD_CONTENT_LENGTH, val); + } + + @Override + public Mapper startResumableWrite() { + return b -> { + if (val > 0) { + b.getWriteObjectSpecBuilder().setObjectSize(val); + } + return b; + }; + } + } + static final class ShowDeletedKeys extends RpcOptVal<@NonNull Boolean> implements HmacKeyListOpt { private static final long serialVersionUID = -6604176744362903487L; @@ -2441,6 +2471,10 @@ Mapper bidiWriteObjectRequest() { return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::bidiWriteObject); } + Mapper startResumableWriteRequest() { + return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::startResumableWrite); + } + Mapper getObjectsRequest() { return fuseMappers(ObjectSourceOpt.class, ObjectSourceOpt::getObject); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 89a0986730..5341051a25 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -1090,6 +1090,10 @@ public String open(StorageObject object, Map options) { requestFactory.buildPostRequest(url, new JsonHttpContent(jsonFactory, object)); HttpHeaders requestHeaders = httpRequest.getHeaders(); requestHeaders.set("X-Upload-Content-Type", detectContentType(object, options)); + Long xUploadContentLength = Option.X_UPLOAD_CONTENT_LENGTH.getLong(options); + if (xUploadContentLength != null) { + requestHeaders.set("X-Upload-Content-Length", xUploadContentLength); + } setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options); HttpResponse response = httpRequest.execute(); if (response.getStatusCode() != 200) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java index d4e0abbff8..78747d42d6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java @@ -77,7 +77,8 @@ enum Option { SOFT_DELETED("softDeleted"), COPY_SOURCE_ACL("copySourceAcl"), GENERATION("generation"), - INCLUDE_FOLDERS_AS_PREFIXES("includeFoldersAsPrefixes"); + INCLUDE_FOLDERS_AS_PREFIXES("includeFoldersAsPrefixes"), + X_UPLOAD_CONTENT_LENGTH("x-upload-content-length"); private final String value; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java index a8afe4738f..e35278b5b1 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java @@ -23,6 +23,7 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; import com.google.storage.v2.WriteObjectRequest; @@ -94,7 +95,7 @@ public void syntax_directBuffered_fluent() { @Test public void syntax_resumableUnbuffered_fluent() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req); + ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); UnbufferedWritableByteChannelSession session = ResumableMedia.gapic() .write() @@ -110,7 +111,7 @@ public void syntax_resumableUnbuffered_fluent() { @Test public void syntax_resumableBuffered_fluent() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req); + ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); BufferedWritableByteChannelSession session = ResumableMedia.gapic() .write() @@ -150,7 +151,7 @@ public void syntax_directBuffered_incremental() { @Test public void syntax_resumableUnbuffered_incremental() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req); + ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); GapicWritableByteChannelSessionBuilder b1 = ResumableMedia.gapic() .write() @@ -164,7 +165,7 @@ public void syntax_resumableUnbuffered_incremental() { @Test public void syntax_resumableBuffered_incremental() { ApiFuture startAsync = - ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req); + ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty()); GapicWritableByteChannelSessionBuilder b1 = ResumableMedia.gapic() .write() diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java index 240ae1d5b5..5055c1e217 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java @@ -310,7 +310,9 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception { ApiFuture f = storage.startResumableWrite( - GrpcCallContext.createDefault(), storage.getWriteObjectRequest(info, Opts.empty())); + GrpcCallContext.createDefault(), + storage.getWriteObjectRequest(info, Opts.empty()), + Opts.empty()); ResumableWrite resumableWrite = ApiExceptions.callAndTranslateApiException(f); UploadCtx uploadCtx = diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java index 34b600d74b..8dce209059 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java @@ -161,7 +161,9 @@ public void grpc() throws Exception { ResumableMedia.gapic() .write() .resumableWrite( - storageClient.startResumableWriteCallable().withDefaultCallContext(merge), request); + storageClient.startResumableWriteCallable().withDefaultCallContext(merge), + request, + opts); UnbufferedWritableByteChannelSession session = ResumableMedia.gapic() diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITResumableUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITResumableUploadTest.java new file mode 100644 index 0000000000..cb0524f73b --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITResumableUploadTest.java @@ -0,0 +1,229 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; +import com.google.cloud.storage.BlobWriteSessionConfigs; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.TmpFile; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.CrossRun; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.registry.Generator; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@CrossRun( + backends = {Backend.PROD}, + transports = {Transport.HTTP, Transport.GRPC}) +public final class ITResumableUploadTest { + @Rule public final TemporaryFolder temp = new TemporaryFolder(); + + @Inject public Storage storage; + @Inject public BucketInfo bucket; + @Inject public Generator generator; + + @Test + public void expectedUploadSize_chunked_doesMatch() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + doTestDoesMatch(storage); + } + + @Test + public void expectedUploadSize_chunked_doesNotMatch() throws IOException { + doTestDoesNotMatch(storage); + } + + @Test + @CrossRun.Exclude(transports = Transport.HTTP) + public void expectedUploadSize_bidi_doesMatch() throws Exception { + StorageOptions options = + storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bidiWrite()) + .build(); + try (Storage storage = options.getService()) { + doTestDoesMatch(storage); + } + } + + @Test + @CrossRun.Exclude(transports = Transport.HTTP) + public void expectedUploadSize_bidi_doesNotMatch() throws Exception { + StorageOptions options = + storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bidiWrite()) + .build(); + try (Storage storage = options.getService()) { + doTestDoesNotMatch(storage); + } + } + + @Test + public void expectedUploadSize_ignored_pcu() throws Exception { + StorageOptions options = + storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.parallelCompositeUpload()) + .build(); + try (Storage storage = options.getService()) { + int objectContentSize = 10; + byte[] bytes = DataGenerator.base64Characters().genBytes(objectContentSize); + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobWriteSession session = + storage.blobWriteSession(info, BlobWriteOption.expectedObjectSize(objectContentSize + 1)); + try (WritableByteChannel open = session.open()) { + open.write(ByteBuffer.wrap(bytes)); + } + + BlobInfo gen1 = session.getResult().get(5, TimeUnit.SECONDS); + assertThat(gen1.getSize()).isEqualTo(objectContentSize); + } + } + + @Test + public void expectedUploadSize_createFrom_inputStream_doesMatch() throws Exception { + StorageOptions options = + storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.parallelCompositeUpload()) + .build(); + try (Storage storage = options.getService()) { + int objectContentSize = 10; + byte[] bytes = DataGenerator.base64Characters().genBytes(objectContentSize); + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobInfo gen1 = + storage.createFrom( + info, + new ByteArrayInputStream(bytes), + BlobWriteOption.expectedObjectSize(objectContentSize)); + assertThat(gen1.getSize()).isEqualTo(objectContentSize); + } + } + + @Test + public void expectedUploadSize_createFrom_inputStream_doesNotMatch() throws Exception { + StorageOptions options = + storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.parallelCompositeUpload()) + .build(); + try (Storage storage = options.getService()) { + int objectContentSize = 10; + byte[] bytes = DataGenerator.base64Characters().genBytes(objectContentSize); + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + StorageException se = + assertThrows( + StorageException.class, + () -> + storage.createFrom( + info, + new ByteArrayInputStream(bytes), + BlobWriteOption.expectedObjectSize(objectContentSize + 1))); + + assertThat(se.getCode()).isEqualTo(400); + } + } + + @Test + public void expectedUploadSize_createFrom_path_doesMatch() throws IOException { + int objectContentSize = 10; + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + try (TmpFile tmpFile = + DataGenerator.base64Characters().tempFile(temp.getRoot().toPath(), objectContentSize)) { + BlobInfo gen1 = + storage.createFrom( + info, tmpFile.getPath(), BlobWriteOption.expectedObjectSize(objectContentSize)); + assertThat(gen1.getSize()).isEqualTo(objectContentSize); + } + } + + @Test + public void expectedUploadSize_createFrom_path_doesNotMatch() throws IOException { + int objectContentSize = 10; + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + try (TmpFile tmpFile = + DataGenerator.base64Characters().tempFile(temp.getRoot().toPath(), objectContentSize)) { + StorageException se = + assertThrows( + StorageException.class, + () -> + storage.createFrom( + info, + tmpFile.getPath(), + BlobWriteOption.expectedObjectSize(objectContentSize + 1))); + + assertThat(se.getCode()).isEqualTo(400); + } + } + + private void doTestDoesMatch(Storage storage) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + int objectContentSize = 10; + byte[] bytes = DataGenerator.base64Characters().genBytes(objectContentSize); + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobWriteSession session = + storage.blobWriteSession(info, BlobWriteOption.expectedObjectSize(objectContentSize)); + try (WritableByteChannel open = session.open()) { + open.write(ByteBuffer.wrap(bytes)); + } + + BlobInfo gen1 = session.getResult().get(5, TimeUnit.SECONDS); + assertThat(gen1.getSize()).isEqualTo(objectContentSize); + } + + private void doTestDoesNotMatch(Storage storage) throws IOException { + int objectContentSize = 10; + byte[] bytes = DataGenerator.base64Characters().genBytes(objectContentSize); + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobWriteSession session = + storage.blobWriteSession(info, BlobWriteOption.expectedObjectSize(objectContentSize + 1)); + + WritableByteChannel open = session.open(); + open.write(ByteBuffer.wrap(bytes)); + StorageException se = assertThrows(StorageException.class, open::close); + + assertThat(se.getCode()).isEqualTo(400); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java index 4ed6887fe5..9739abcc3f 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java @@ -117,6 +117,7 @@ static BackendResources of(Backend backend) { optionsBuilder .setGrpcInterceptorProvider( GrpcPlainRequestLoggingInterceptor.getInterceptorProvider()) + .setEnableGrpcClientMetrics(false) .build(); return new StorageInstance(built, protectedBucketNames); });