- 
                Notifications
    You must be signed in to change notification settings 
- Fork 86
chore: Adding implmentation of RequestBody and RewindableContent #3363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f656a9e
              a34fc29
              6dc7f67
              c9551de
              2778948
              28c8216
              504ce09
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -34,6 +34,8 @@ | |
| import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; | ||
| import com.google.cloud.storage.multipartupload.model.ListPartsRequest; | ||
| import com.google.cloud.storage.multipartupload.model.ListPartsResponse; | ||
| import com.google.cloud.storage.multipartupload.model.UploadPartRequest; | ||
| import com.google.cloud.storage.multipartupload.model.UploadPartResponse; | ||
| import com.google.common.base.StandardSystemProperty; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import java.io.IOException; | ||
|  | @@ -112,6 +114,23 @@ AbortMultipartUploadResponse sendAbortMultipartUploadRequest( | |
| return httpRequest.execute().parseAs(AbortMultipartUploadResponse.class); | ||
| } | ||
|  | ||
| public UploadPartResponse sendUploadPartRequest( | ||
| URI uri, UploadPartRequest request, RequestBody requestBody) throws IOException { | ||
| String encodedBucket = urlEncode(request.bucket()); | ||
| String encodedKey = urlEncode(request.key()); | ||
| String resourcePath = "/" + encodedBucket + "/" + encodedKey; | ||
| String queryString = | ||
| "?partNumber=" + request.partNumber() + "&uploadId=" + urlEncode(request.uploadId()); | ||
| String uploadUri = uri.toString() + resourcePath + queryString; | ||
| HttpRequest httpRequest = | ||
| requestFactory.buildPutRequest(new GenericUrl(uploadUri), requestBody.getContent()); | ||
| httpRequest.getHeaders().putAll(headerProvider.getHeaders()); | ||
| addHeadersForUploadPart(requestBody, httpRequest.getHeaders()); | ||
| httpRequest.setParser(objectParser); | ||
| httpRequest.setThrowExceptionOnExecuteError(true); | ||
| return httpRequest.execute().parseAs(UploadPartResponse.class); | ||
| } | ||
|  | ||
| static MultipartUploadHttpRequestManager createFrom(HttpStorageOptions options) { | ||
| Storage storage = options.getStorageRpcV1().getStorage(); | ||
| ImmutableMap.Builder<String, String> stableHeaders = | ||
|  | @@ -133,6 +152,10 @@ static MultipartUploadHttpRequestManager createFrom(HttpStorageOptions options) | |
| options.getMergedHeaderProvider(FixedHeaderProvider.create(stableHeaders.build()))); | ||
| } | ||
|  | ||
| private void addHeadersForUploadPart(RequestBody requestBody, HttpHeaders headers) { | ||
| headers.put("x-goog-hash", "crc32c=" + requestBody.getContent().getCrc32c()); | ||
| } | ||
| 
      Comment on lines
    
      +155
     to 
      +157
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: one-line logic-less helper methods don't help Is there a plan to add more in here? | ||
|  | ||
| private void addHeadersForCreateMultipartUpload( | ||
| CreateMultipartUploadRequest request, HttpHeaders headers) { | ||
| // TODO(shreyassinha): add a PredefinedAcl::getXmlEntry with the corresponding value from | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Copyright 2023 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|  | ||
| package com.google.cloud.storage; | ||
|  | ||
| import com.google.api.core.BetaApi; | ||
| import com.google.api.core.InternalExtensionOnly; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.file.Path; | ||
|  | ||
| /** | ||
| * The data of a single {@code UploadPart} in a GCS XML MPU. | ||
| * | ||
| * <p>Instances of this class are thread-safe and immutable. | ||
| * | ||
| * @see <a | ||
| * href="https://cloud.google.com/storage/docs/multipart-uploads#upload_parts">https://cloud.google.com/storage/docs/multipart-uploads#upload_parts</a> | ||
| */ | ||
| @BetaApi | ||
| @InternalExtensionOnly | ||
| public final class RequestBody { | ||
|  | ||
| private final RewindableContent content; | ||
|  | ||
| private RequestBody(RewindableContent content) { | ||
| this.content = content; | ||
| } | ||
|  | ||
| RewindableContent getContent() { | ||
| return content; | ||
| } | ||
|  | ||
| /** Create a new empty RequestBody. */ | ||
| @BetaApi | ||
| public static RequestBody empty() { | ||
| return new RequestBody(RewindableContent.empty()); | ||
| } | ||
|  | ||
| /** Create a new RequestBody from the given {@link ByteBuffer}s. */ | ||
| @BetaApi | ||
| public static RequestBody of(ByteBuffer... buffers) { | ||
| return new RequestBody(RewindableContent.of(buffers)); | ||
| } | ||
|  | ||
| /** Create a new RequestBody from the given {@link ByteBuffer}s. */ | ||
| @BetaApi | ||
| public static RequestBody of(ByteBuffer[] srcs, int srcsOffset, int srcsLength) { | ||
| return new RequestBody(RewindableContent.of(srcs, srcsOffset, srcsLength)); | ||
| } | ||
|  | ||
| /** Create a new RequestBody from the given {@link Path}. */ | ||
| @BetaApi | ||
| public static RequestBody of(Path path) throws IOException { | ||
| return new RequestBody(RewindableContent.of(path)); | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -18,6 +18,9 @@ | |
|  | ||
| import com.google.api.client.http.AbstractHttpContent; | ||
| import com.google.api.client.http.HttpMediaType; | ||
| import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; | ||
| import com.google.cloud.storage.Hasher.GuavaHasher; | ||
| import com.google.cloud.storage.Hasher.NoOpHasher; | ||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.io.ByteStreams; | ||
| import java.io.IOException; | ||
|  | @@ -55,6 +58,8 @@ public final boolean retrySupported() { | |
| return false; | ||
| } | ||
|  | ||
| abstract String getCrc32c(); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: return Crc32cLengthKnown and let the consumer of this value own the logic of encoding the value for the respected use. For example, in grpc, crc32c values are  | ||
|  | ||
| static RewindableContent empty() { | ||
| return EmptyRewindableContent.INSTANCE; | ||
| } | ||
|  | @@ -111,6 +116,12 @@ protected void rewindTo(long offset) {} | |
|  | ||
| @Override | ||
| void flagDirty() {} | ||
|  | ||
| @Override | ||
| String getCrc32c() { | ||
| Crc32cLengthKnown cumulative = Crc32cValue.zero(); | ||
| return Utils.crc32cCodec.encode(cumulative.getValue()); | ||
| } | ||
| } | ||
|  | ||
| private static final class PathRewindableContent extends RewindableContent { | ||
|  | @@ -165,6 +176,36 @@ long writeTo(GatheringByteChannel gbc) throws IOException { | |
|  | ||
| @Override | ||
| void flagDirty() {} | ||
|  | ||
| @Override | ||
| String getCrc32c() { | ||
| GuavaHasher hasher; | ||
| { | ||
| Hasher defaultHasher = Hasher.defaultHasher(); | ||
| if (defaultHasher instanceof NoOpHasher) { | ||
| return null; | ||
| } else { | ||
| hasher = Hasher.enabled(); | ||
| } | ||
| } | ||
| Crc32cLengthKnown cumulative = Crc32cValue.zero(); | ||
|  | ||
| int bufferSize = 8192; // 8KiB buffer for reading chunks | ||
| ByteBuffer buffer = ByteBuffer.allocate(bufferSize); | ||
|  | ||
| try (SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ)) { | ||
| while (channel.read(buffer) != -1) { | ||
| buffer.flip(); | ||
| if (buffer.hasRemaining()) { | ||
| cumulative = cumulative.concat(hasher.hash(buffer::duplicate)); | ||
| } | ||
| buffer.clear(); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to read file for CRC32C calculation: " + path, e); | ||
| } | ||
| return Utils.crc32cCodec.encode(cumulative.getValue()); | ||
| } | ||
| } | ||
|  | ||
| private static final class ByteBufferContent extends RewindableContent { | ||
|  | @@ -260,5 +301,23 @@ void rewindTo(long offset) { | |
| void flagDirty() { | ||
| this.dirty = true; | ||
| } | ||
|  | ||
| @Override | ||
| String getCrc32c() { | ||
| GuavaHasher hasher; | ||
| { | ||
| Hasher defaultHasher = Hasher.defaultHasher(); | ||
| if (defaultHasher instanceof NoOpHasher) { | ||
| return null; | ||
| } else { | ||
| hasher = Hasher.enabled(); | ||
| } | ||
| } | ||
| Crc32cLengthKnown cumulative = Crc32cValue.zero(); | ||
| for (ByteBuffer buffer : buffers) { | ||
| cumulative = cumulative.concat(hasher.hash(buffer::duplicate)); | ||
| } | ||
| return Utils.crc32cCodec.encode(cumulative.getValue()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to do dirty state tracking, and to actually rewind the content before a followup invocation. See
java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java
Lines 62 to 76 in 763be91
Additionally, this should pass the RewindableContent to
sendUploadPartRequestso that this method manages the lifecycle of it and integrates correctly with the retries.