Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse;
import com.google.cloud.storage.multipartupload.model.ListPartsRequest;
import com.google.cloud.storage.multipartupload.model.ListPartsResponse;
import com.google.cloud.storage.multipartupload.model.UploadPartRequest;
import com.google.cloud.storage.multipartupload.model.UploadPartResponse;
import java.io.IOException;
import java.net.URI;

Expand Down Expand Up @@ -71,6 +73,16 @@ public abstract CreateMultipartUploadResponse createMultipartUpload(
public abstract AbortMultipartUploadResponse abortMultipartUpload(
AbortMultipartUploadRequest request);

/**
* Uploads a part in a multipart upload.
*
* @param request The request object containing the details for uploading the part.
* @param requestBody The content of the part to upload.
* @return An {@link UploadPartResponse} object containing the ETag of the uploaded part.
*/
@BetaApi
public abstract UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requestBody);

/**
* Creates a new instance of {@link MultipartUploadClient}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse;
import com.google.cloud.storage.multipartupload.model.ListPartsRequest;
import com.google.cloud.storage.multipartupload.model.ListPartsResponse;
import com.google.cloud.storage.multipartupload.model.UploadPartRequest;
import com.google.cloud.storage.multipartupload.model.UploadPartResponse;
import java.io.IOException;
import java.net.URI;

Expand Down Expand Up @@ -75,4 +77,12 @@ public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadReq
() -> httpRequestManager.sendAbortMultipartUploadRequest(uri, request),
Decoder.identity());
}

@Override
public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requestBody) {
return retrier.run(
retryAlgorithmManager.idempotent(),
() -> httpRequestManager.sendUploadPartRequest(uri, request, requestBody),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to do dirty state tracking, and to actually rewind the content before a followup invocation. See

AtomicBoolean dirty = new AtomicBoolean(false);
return retrier.run(
() -> {
if (dirty.getAndSet(true)) {
ResumableOperationResult<@Nullable StorageObject> query = query();
long persistedSize = query.getPersistedSize();
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
return query;
} else {
task.rewindTo(persistedSize);
}
}
return task.call();
},
Decoder.identity());
for how this is done for json resumable uploads.

Additionally, this should pass the RewindableContent to sendUploadPartRequest so that this method manages the lifecycle of it and integrates correctly with the retries.

Decoder.identity());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse;
import com.google.cloud.storage.multipartupload.model.ListPartsRequest;
import com.google.cloud.storage.multipartupload.model.ListPartsResponse;
import com.google.cloud.storage.multipartupload.model.UploadPartRequest;
import com.google.cloud.storage.multipartupload.model.UploadPartResponse;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
Expand Down Expand Up @@ -112,6 +114,23 @@ AbortMultipartUploadResponse sendAbortMultipartUploadRequest(
return httpRequest.execute().parseAs(AbortMultipartUploadResponse.class);
}

public UploadPartResponse sendUploadPartRequest(
URI uri, UploadPartRequest request, RequestBody requestBody) throws IOException {
String encodedBucket = urlEncode(request.bucket());
String encodedKey = urlEncode(request.key());
String resourcePath = "/" + encodedBucket + "/" + encodedKey;
String queryString =
"?partNumber=" + request.partNumber() + "&uploadId=" + urlEncode(request.uploadId());
String uploadUri = uri.toString() + resourcePath + queryString;
HttpRequest httpRequest =
requestFactory.buildPutRequest(new GenericUrl(uploadUri), requestBody.getContent());
httpRequest.getHeaders().putAll(headerProvider.getHeaders());
addHeadersForUploadPart(requestBody, httpRequest.getHeaders());
httpRequest.setParser(objectParser);
httpRequest.setThrowExceptionOnExecuteError(true);
return httpRequest.execute().parseAs(UploadPartResponse.class);
}

static MultipartUploadHttpRequestManager createFrom(HttpStorageOptions options) {
Storage storage = options.getStorageRpcV1().getStorage();
ImmutableMap.Builder<String, String> stableHeaders =
Expand All @@ -133,6 +152,10 @@ static MultipartUploadHttpRequestManager createFrom(HttpStorageOptions options)
options.getMergedHeaderProvider(FixedHeaderProvider.create(stableHeaders.build())));
}

private void addHeadersForUploadPart(RequestBody requestBody, HttpHeaders headers) {
headers.put("x-goog-hash", "crc32c=" + requestBody.getContent().getCrc32c());
}
Comment on lines +155 to +157
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one-line logic-less helper methods don't help

Is there a plan to add more in here?


private void addHeadersForCreateMultipartUpload(
CreateMultipartUploadRequest request, HttpHeaders headers) {
// TODO(shreyassinha): add a PredefinedAcl::getXmlEntry with the corresponding value from
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;

/**
* The data of a single {@code UploadPart} in a GCS XML MPU.
*
* <p>Instances of this class are thread-safe and immutable.
*
* @see <a
* href="https://cloud.google.com/storage/docs/multipart-uploads#upload_parts">https://cloud.google.com/storage/docs/multipart-uploads#upload_parts</a>
*/
@BetaApi
@InternalExtensionOnly
public final class RequestBody {

private final RewindableContent content;

private RequestBody(RewindableContent content) {
this.content = content;
}

RewindableContent getContent() {
return content;
}

/** Create a new empty RequestBody. */
@BetaApi
public static RequestBody empty() {
return new RequestBody(RewindableContent.empty());
}

/** Create a new RequestBody from the given {@link ByteBuffer}s. */
@BetaApi
public static RequestBody of(ByteBuffer... buffers) {
return new RequestBody(RewindableContent.of(buffers));
}

/** Create a new RequestBody from the given {@link ByteBuffer}s. */
@BetaApi
public static RequestBody of(ByteBuffer[] srcs, int srcsOffset, int srcsLength) {
return new RequestBody(RewindableContent.of(srcs, srcsOffset, srcsLength));
}

/** Create a new RequestBody from the given {@link Path}. */
@BetaApi
public static RequestBody of(Path path) throws IOException {
return new RequestBody(RewindableContent.of(path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import com.google.api.client.http.AbstractHttpContent;
import com.google.api.client.http.HttpMediaType;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.Hasher.GuavaHasher;
import com.google.cloud.storage.Hasher.NoOpHasher;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
Expand Down Expand Up @@ -55,6 +58,8 @@ public final boolean retrySupported() {
return false;
}

abstract String getCrc32c();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return Crc32cLengthKnown and let the consumer of this value own the logic of encoding the value for the respected use.

For example, in grpc, crc32c values are ints not base64 encoded 4-byte big-endian representation of an unsigned 32-bit integer.


static RewindableContent empty() {
return EmptyRewindableContent.INSTANCE;
}
Expand Down Expand Up @@ -111,6 +116,12 @@ protected void rewindTo(long offset) {}

@Override
void flagDirty() {}

@Override
String getCrc32c() {
Crc32cLengthKnown cumulative = Crc32cValue.zero();
return Utils.crc32cCodec.encode(cumulative.getValue());
}
}

private static final class PathRewindableContent extends RewindableContent {
Expand Down Expand Up @@ -165,6 +176,36 @@ long writeTo(GatheringByteChannel gbc) throws IOException {

@Override
void flagDirty() {}

@Override
String getCrc32c() {
GuavaHasher hasher;
{
Hasher defaultHasher = Hasher.defaultHasher();
if (defaultHasher instanceof NoOpHasher) {
return null;
} else {
hasher = Hasher.enabled();
}
}
Crc32cLengthKnown cumulative = Crc32cValue.zero();

int bufferSize = 8192; // 8KiB buffer for reading chunks
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);

try (SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ)) {
while (channel.read(buffer) != -1) {
buffer.flip();
if (buffer.hasRemaining()) {
cumulative = cumulative.concat(hasher.hash(buffer::duplicate));
}
buffer.clear();
}
} catch (IOException e) {
throw new RuntimeException("Failed to read file for CRC32C calculation: " + path, e);
}
return Utils.crc32cCodec.encode(cumulative.getValue());
}
}

private static final class ByteBufferContent extends RewindableContent {
Expand Down Expand Up @@ -260,5 +301,23 @@ void rewindTo(long offset) {
void flagDirty() {
this.dirty = true;
}

@Override
String getCrc32c() {
GuavaHasher hasher;
{
Hasher defaultHasher = Hasher.defaultHasher();
if (defaultHasher instanceof NoOpHasher) {
return null;
} else {
hasher = Hasher.enabled();
}
}
Crc32cLengthKnown cumulative = Crc32cValue.zero();
for (ByteBuffer buffer : buffers) {
cumulative = cumulative.concat(hasher.hash(buffer::duplicate));
}
return Utils.crc32cCodec.encode(cumulative.getValue());
}
}
}
Loading
Loading