Skip to content

HADOOP-18577. log/probes of HADOOP-18546 presence. #5205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -235,6 +236,7 @@ public String toString() {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
sb.append('}');
return sb.toString();
}
Expand Down Expand Up @@ -1636,6 +1638,11 @@ public boolean hasPathCapability(final Path path, final String capability)
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
listener));

// probe for presence of the HADOOP-18546 readahead fix.
case CAPABILITY_SAFE_READAHEAD:
return true;

default:
return super.hasPathCapability(p, capability);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.constants;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Constants which are used internally and which don't fit into the other
* classes.
* For use within the {@code hadoop-azure} module only.
*/
@InterfaceAudience.Private
public final class InternalConstants {

private InternalConstants() {
}

/**
* Does this version of the store have safe readahead?
* Possible combinations of this and the probe
* {@code "fs.capability.etags.available"}.
* <ol>
* <li>{@value}: store is safe</li>
* <li>!etags: store is safe</li>
* <li>etags && !{@value}: store is <i>UNSAFE</i></li>
* </ol>
*/
public static final String CAPABILITY_SAFE_READAHEAD =
"fs.azure.capability.readahead.safe";
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.util.StringUtils.toLowerCase;

/**
Expand Down Expand Up @@ -828,11 +829,12 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append("}");
sb.append(", ").append(streamStatistics);
}
sb.append("}");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void init() {

// hide instance constructor
private ReadBufferManager() {
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@

import java.net.URI;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.services.AuthType;

import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.junit.Assume.assumeTrue;

/**
* Test AzureBlobFileSystem initialization.
*/
Expand Down Expand Up @@ -74,4 +82,28 @@ public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception {
assertNotNull("working directory", fs.getWorkingDirectory());
}
}

@Test
public void testFileSystemCapabilities() throws Throwable {
final AzureBlobFileSystem fs = getFileSystem();

final Path p = new Path("}");
// etags always present
Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE))
.describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs)
.isTrue();
// readahead always correct
Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD))
.describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
.isTrue();

// etags-over-rename and ACLs are either both true or both false.
final boolean etagsAcrossRename = fs.hasPathCapability(p, ETAGS_PRESERVED_IN_RENAME);
final boolean acls = fs.hasPathCapability(p, FS_ACLS);
Assertions.assertThat(etagsAcrossRename)
.describedAs("capabilities %s=%s and %s=%s in %s",
ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename,
FS_ACLS, acls, fs)
.isEqualTo(acls);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,24 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;

public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {

/**
* Time before the JUnit test times out for eventually() clauses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: causes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. eventually takes a lambda expression, which is what i was referring to

* to fail. This copes with slow network connections and debugging
* sessions, yet still allows for tests to fail with meaningful
* messages.
*/
public static final int TIMEOUT_OFFSET = 5 * 60_000;

/**
* Interval between eventually preobes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "probes"

*/
public static final int PROBE_INTERVAL_MILLIS = 1_000;

public ITestReadBufferManager() throws Exception {
}

Expand All @@ -61,6 +76,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
// verify that the fs has the capability to validate the fix
Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD))
.describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
.isTrue();

try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
Expand All @@ -80,9 +100,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}

ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
// verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
// readahead queue is empty
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
// verify the in progress list eventually empties out.
eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
}

private void assertListEmpty(String listName, List<ReadBuffer> list) {
Expand Down