Skip to content

Commit 7843368

Browse files
snvijayaarjun4084346
authored andcommitted
HADOOP-17215: Support for conditional overwrite.
Contributed by Sneha Vijayarajan DETAILS: This change adds config key "fs.azure.enable.conditional.create.overwrite" with a default of true. When enabled, if create(path, overwrite: true) is invoked and the file exists, the ABFS driver will first obtain its etag and then attempt to overwrite the file on the condition that the etag matches. The purpose of this is to mitigate the non-idempotency of this method. Specifically, in the event of a network error or similar, the client will retry and this can result in the file being created more than once which may result in data loss. In essense this is like a poor man's file handle, and will be addressed more thoroughly in the future when support for lease is added to ABFS. TEST RESULTS: namespace.enabled=true auth.type=SharedKey ------------------- $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify Tests run: 87, Failures: 0, Errors: 0, Skipped: 0 Tests run: 457, Failures: 0, Errors: 0, Skipped: 42 Tests run: 207, Failures: 0, Errors: 0, Skipped: 24 namespace.enabled=true auth.type=OAuth ------------------- $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify Tests run: 87, Failures: 0, Errors: 0, Skipped: 0 Tests run: 457, Failures: 0, Errors: 0, Skipped: 74 Tests run: 207, Failures: 0, Errors: 0, Skipped: 140 (cherry picked from commit e31a636)
1 parent 75a398d commit 7843368

File tree

10 files changed

+564
-12
lines changed

10 files changed

+564
-12
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ public class AbfsConfiguration{
134134
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
135135
private String azureAtomicDirs;
136136

137+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE,
138+
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE)
139+
private boolean enableConditionalCreateOverwrite;
140+
137141
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
138142
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
139143
private boolean createRemoteFileSystemDuringInitialization;
@@ -427,6 +431,10 @@ public String getAzureAtomicRenameDirs() {
427431
return this.azureAtomicDirs;
428432
}
429433

434+
public boolean isConditionalCreateOverwriteEnabled() {
435+
return this.enableConditionalCreateOverwrite;
436+
}
437+
430438
public boolean getCreateRemoteFileSystemDuringInitialization() {
431439
return this.createRemoteFileSystemDuringInitialization;
432440
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
6262
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
6363
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
64+
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
6465
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
6566
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
6667
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
@@ -359,9 +360,28 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
359360
umask.toString(),
360361
isNamespaceEnabled);
361362

362-
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
363-
isNamespaceEnabled ? getOctalNotation(permission) : null,
364-
isNamespaceEnabled ? getOctalNotation(umask) : null);
363+
// if "fs.azure.enable.conditional.create.overwrite" is enabled and
364+
// is a create request with overwrite=true, create will follow different
365+
// flow.
366+
boolean triggerConditionalCreateOverwrite = false;
367+
if (overwrite
368+
&& abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
369+
triggerConditionalCreateOverwrite = true;
370+
}
371+
372+
if (triggerConditionalCreateOverwrite) {
373+
conditionalCreateOverwriteFile(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
374+
isNamespaceEnabled ? getOctalNotation(permission) : null,
375+
isNamespaceEnabled ? getOctalNotation(umask) : null
376+
);
377+
378+
} else {
379+
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true,
380+
overwrite,
381+
isNamespaceEnabled ? getOctalNotation(permission) : null,
382+
isNamespaceEnabled ? getOctalNotation(umask) : null,
383+
null);
384+
}
365385

366386
return new AbfsOutputStream(
367387
client,
@@ -379,6 +399,70 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
379399
.build();
380400
}
381401

402+
/**
403+
* Conditional create overwrite flow ensures that create overwrites is done
404+
* only if there is match for eTag of existing file.
405+
* @param relativePath
406+
* @param permission
407+
* @param umask
408+
* @return
409+
* @throws AzureBlobFileSystemException
410+
*/
411+
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
412+
final String permission,
413+
final String umask) throws AzureBlobFileSystemException {
414+
AbfsRestOperation op;
415+
416+
try {
417+
// Trigger a create with overwrite=false first so that eTag fetch can be
418+
// avoided for cases when no pre-existing file is present (major portion
419+
// of create file traffic falls into the case of no pre-existing file).
420+
op = client.createPath(relativePath, true,
421+
false, permission, umask, null);
422+
} catch (AbfsRestOperationException e) {
423+
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
424+
// File pre-exists, fetch eTag
425+
try {
426+
op = client.getPathStatus(relativePath);
427+
} catch (AbfsRestOperationException ex) {
428+
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
429+
// Is a parallel access case, as file which was found to be
430+
// present went missing by this request.
431+
throw new ConcurrentWriteOperationDetectedException(
432+
"Parallel access to the create path detected. Failing request "
433+
+ "to honor single writer semantics");
434+
} else {
435+
throw ex;
436+
}
437+
}
438+
439+
String eTag = op.getResult()
440+
.getResponseHeader(HttpHeaderConfigurations.ETAG);
441+
442+
try {
443+
// overwrite only if eTag matches with the file properties fetched befpre
444+
op = client.createPath(relativePath, true,
445+
true, permission, umask, eTag);
446+
} catch (AbfsRestOperationException ex) {
447+
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
448+
// Is a parallel access case, as file with eTag was just queried
449+
// and precondition failure can happen only when another file with
450+
// different etag got created.
451+
throw new ConcurrentWriteOperationDetectedException(
452+
"Parallel access to the create path detected. Failing request "
453+
+ "to honor single writer semantics");
454+
} else {
455+
throw ex;
456+
}
457+
}
458+
} else {
459+
throw e;
460+
}
461+
}
462+
463+
return op;
464+
}
465+
382466
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
383467
throws AzureBlobFileSystemException {
384468
boolean isNamespaceEnabled = getIsNamespaceEnabled();
@@ -391,7 +475,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina
391475

392476
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
393477
isNamespaceEnabled ? getOctalNotation(permission) : null,
394-
isNamespaceEnabled ? getOctalNotation(umask) : null);
478+
isNamespaceEnabled ? getOctalNotation(umask) : null, null);
395479
}
396480

397481
public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public final class ConfigurationKeys {
5050
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
5151
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
5252
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
53+
/** This config ensures that during create overwrite an existing file will be
54+
* overwritten only if there is a match on the eTag of existing file.
55+
*/
56+
public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite";
5357
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
5458
/** Provides a config control to enable or disable ABFS Flush operations -
5559
* HFlush and HSync. Default is true. **/

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public final class FileSystemConfigurations {
5454
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
5555

5656
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
57+
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
5758

5859
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
5960
public static final boolean DEFAULT_ENABLE_FLUSH = true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
20+
21+
/**
22+
* Thrown when a concurrent write operation is detected.
23+
*/
24+
@org.apache.hadoop.classification.InterfaceAudience.Public
25+
@org.apache.hadoop.classification.InterfaceStability.Evolving
26+
public class ConcurrentWriteOperationDetectedException
27+
extends AzureBlobFileSystemException {
28+
29+
public ConcurrentWriteOperationDetectedException(String message) {
30+
super(message);
31+
}
32+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException
220220
}
221221

222222
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
223-
final String permission, final String umask) throws AzureBlobFileSystemException {
223+
final String permission, final String umask, final String eTag) throws AzureBlobFileSystemException {
224224
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
225225
if (!overwrite) {
226226
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -234,6 +234,10 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
234234
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
235235
}
236236

237+
if (eTag != null && !eTag.isEmpty()) {
238+
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
239+
}
240+
237241
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
238242
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
239243

@@ -536,7 +540,8 @@ private URL createRequestUrl(final String query) throws AzureBlobFileSystemExcep
536540
return createRequestUrl(EMPTY_STRING, query);
537541
}
538542

539-
private URL createRequestUrl(final String path, final String query)
543+
@VisibleForTesting
544+
protected URL createRequestUrl(final String path, final String query)
540545
throws AzureBlobFileSystemException {
541546
final String base = baseUrl.toString();
542547
String encodedPath = path;

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,18 @@ public void testAbfsHttpSendStatistics() throws IOException {
9595
connectionsMade++;
9696
requestsSent++;
9797

98+
9899
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
99100
sendRequestPath)) {
100101

102+
// Is a file overwrite case
103+
long createRequestCalls = 1;
104+
long createTriggeredGFSForETag = 0;
105+
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
106+
createRequestCalls += 1;
107+
createTriggeredGFSForETag = 1;
108+
}
109+
101110
for (int i = 0; i < LARGE_OPERATIONS; i++) {
102111
out.write(testNetworkStatsString.getBytes());
103112

@@ -126,10 +135,13 @@ public void testAbfsHttpSendStatistics() throws IOException {
126135
* wrote each time).
127136
*
128137
*/
138+
139+
connectionsMade += createRequestCalls + createTriggeredGFSForETag;
140+
requestsSent += createRequestCalls;
129141
assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
130-
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
142+
connectionsMade + LARGE_OPERATIONS * 2, metricMap);
131143
assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
132-
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
144+
requestsSent + LARGE_OPERATIONS * 2, metricMap);
133145
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
134146
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
135147
metricMap);
@@ -202,13 +214,21 @@ public void testAbfsHttpResponseStatistics() throws IOException {
202214
try {
203215

204216
/*
205-
* Creating a file and writing buffer into it. Also recording the
206-
* buffer for future read() call.
217+
* Creating a file and writing buffer into it.
218+
* This is a file recreate, so it will trigger
219+
* 2 extra calls if create overwrite is off by default.
220+
* Also recording the buffer for future read() call.
207221
* This creating outputStream and writing requires 2 *
208222
* (LARGE_OPERATIONS) get requests.
209223
*/
210224
StringBuilder largeBuffer = new StringBuilder();
211225
out = fs.create(getResponsePath);
226+
227+
long createRequestCalls = 1;
228+
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
229+
createRequestCalls += 2;
230+
}
231+
212232
for (int i = 0; i < LARGE_OPERATIONS; i++) {
213233
out.write(testResponseString.getBytes());
214234
out.hflush();
@@ -233,7 +253,8 @@ public void testAbfsHttpResponseStatistics() throws IOException {
233253
*
234254
* get_response : get_responses(Last assertion) + 1
235255
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
236-
* LARGE_OPERATIONS times) + 1(open()) + 1(read()).
256+
* LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
257+
* 1 (createOverwriteTriggeredGetForeTag).
237258
*
238259
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
239260
* bytes wrote each time (bytes_received is equal to bytes wrote in the
@@ -244,7 +265,8 @@ public void testAbfsHttpResponseStatistics() throws IOException {
244265
bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
245266
metricMap);
246267
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
247-
getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
268+
getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
269+
metricMap);
248270

249271
} finally {
250272
IOUtils.cleanupWithLogger(LOG, out, in);

0 commit comments

Comments
 (0)