Skip to content

Commit 277143e

Browse files
committed
Use stronger write-once semantics for Azure repository (#30437)
There's no need for an extra blobExists() call when writing a blob to the Azure service. Azure provides an option (with stronger consistency guarantees) on the upload method that guarantees that the blob that's uploaded does not already exist. This saves one network roundtrip. Relates to #19749
1 parent 9be3ebd commit 277143e

File tree

6 files changed

+36
-9
lines changed

6 files changed

+36
-9
lines changed

plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,12 @@ private static PathTrie<RequestHandler> defaultHandlers(final String endpoint, f
164164
if (destContainer == null) {
165165
return newContainerNotFoundError(requestId);
166166
}
167-
destContainer.objects.put(destBlobName, body);
167+
168+
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, body);
169+
if (existingBytes != null) {
170+
return newBlobAlreadyExistsError(requestId);
171+
}
172+
168173
return new Response(RestStatus.CREATED, emptyMap(), "text/plain", EMPTY_BYTE);
169174
})
170175
);
@@ -363,6 +368,10 @@ private static Response newBlobNotFoundError(final long requestId) {
363368
return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist");
364369
}
365370

371+
private static Response newBlobAlreadyExistsError(final long requestId) {
372+
return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists");
373+
}
374+
366375
private static Response newInternalError(final long requestId) {
367376
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "The server encountered an internal error");
368377
}

plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,8 @@ public InputStream readBlob(String blobName) throws IOException {
8888

8989
@Override
9090
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
91-
if (blobExists(blobName)) {
92-
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
93-
}
9491
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
92+
9593
try {
9694
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
9795
} catch (URISyntaxException|StorageException e) {

plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.IOException;
3535
import java.io.InputStream;
3636
import java.net.URISyntaxException;
37+
import java.nio.file.FileAlreadyExistsException;
3738
import java.util.Locale;
3839
import java.util.Map;
3940

@@ -115,7 +116,8 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
115116
return this.client.listBlobsByPrefix(this.clientName, this.locMode, container, keyPath, prefix);
116117
}
117118

118-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException {
119+
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException,
120+
FileAlreadyExistsException {
119121
this.client.writeBlob(this.clientName, this.locMode, container, blobName, inputStream, blobSize);
120122
}
121123
}

plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.IOException;
3434
import java.io.InputStream;
3535
import java.net.URISyntaxException;
36+
import java.nio.file.FileAlreadyExistsException;
3637
import java.util.Map;
3738

3839
/**
@@ -79,7 +80,7 @@ Map<String,BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, St
7980
throws URISyntaxException, StorageException;
8081

8182
void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws
82-
URISyntaxException, StorageException;
83+
URISyntaxException, StorageException, FileAlreadyExistsException;
8384

8485
static InputStream giveSocketPermissionsToStream(InputStream stream) {
8586
return new InputStream() {

plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
package org.elasticsearch.cloud.azure.storage;
2121

22+
import com.microsoft.azure.storage.AccessCondition;
2223
import com.microsoft.azure.storage.CloudStorageAccount;
2324
import com.microsoft.azure.storage.LocationMode;
2425
import com.microsoft.azure.storage.OperationContext;
2526
import com.microsoft.azure.storage.RetryExponentialRetry;
2627
import com.microsoft.azure.storage.RetryPolicy;
28+
import com.microsoft.azure.storage.StorageErrorCodeStrings;
2729
import com.microsoft.azure.storage.StorageException;
2830
import com.microsoft.azure.storage.blob.BlobInputStream;
2931
import com.microsoft.azure.storage.blob.BlobListingDetails;
@@ -46,8 +48,10 @@
4648
import org.elasticsearch.repositories.RepositoryException;
4749

4850
import java.io.InputStream;
51+
import java.net.HttpURLConnection;
4952
import java.net.URI;
5053
import java.net.URISyntaxException;
54+
import java.nio.file.FileAlreadyExistsException;
5155
import java.util.EnumSet;
5256
import java.util.HashMap;
5357
import java.util.Map;
@@ -327,12 +331,21 @@ enumBlobListingDetails, null, generateOperationContext(account))) {
327331

328332
@Override
329333
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
330-
throws URISyntaxException, StorageException {
334+
throws URISyntaxException, StorageException, FileAlreadyExistsException {
331335
logger.trace("writeBlob({}, stream, {})", blobName, blobSize);
332336
CloudBlobClient client = this.getSelectedClient(account, mode);
333337
CloudBlobContainer blobContainer = client.getContainerReference(container);
334338
CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
335-
SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, null, null, generateOperationContext(account)));
339+
try {
340+
SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(),
341+
null, generateOperationContext(account)));
342+
} catch (StorageException se) {
343+
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
344+
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
345+
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
346+
}
347+
throw se;
348+
}
336349
logger.trace("writeBlob({}, stream, {}) - done", blobName, blobSize);
337350
}
338351
}

plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.InputStream;
3535
import java.net.SocketPermission;
3636
import java.net.URISyntaxException;
37+
import java.nio.file.FileAlreadyExistsException;
3738
import java.nio.file.NoSuchFileException;
3839
import java.security.AccessController;
3940
import java.util.Locale;
@@ -106,7 +107,10 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode
106107

107108
@Override
108109
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
109-
throws URISyntaxException, StorageException {
110+
throws URISyntaxException, StorageException, FileAlreadyExistsException {
111+
if (blobs.containsKey(blobName)) {
112+
throw new FileAlreadyExistsException(blobName);
113+
}
110114
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
111115
blobs.put(blobName, outputStream);
112116
Streams.copy(inputStream, outputStream);

0 commit comments

Comments
 (0)