Skip to content

HADOOP-17038 Support positional read in AbfsInputStream #2206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
private long sasTokenRenewPeriodForStreamsInSeconds;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_PREAD,
DefaultValue = DEFAULT_ENABLE_PREAD)
private boolean enablePread;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
Expand Down Expand Up @@ -640,6 +644,10 @@ public boolean shouldTrackLatency() {
return this.trackLatency;
}

public boolean isPreadEnabled() {
return this.enablePread;
}

public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType == AuthType.OAuth) {
Expand Down Expand Up @@ -862,6 +870,11 @@ void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
}

@VisibleForTesting
void setEnablePread(boolean enablePread) {
this.enablePread = enablePread;
}

private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
String value = getPasswordString(key);
if (StringUtils.isBlank(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() {
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withPreadEnabled(abfsConfiguration.isPreadEnabled())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint";
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
public static final String FS_AZURE_ENABLE_PREAD = "fs.azure.enable.pread";

public static String accountProperty(String property, String account) {
return property + "." + account;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins

public static final boolean DEFAULT_ENABLE_PREAD = false;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
/*
* By default the pread API will do a seek + read as in FSInputStream. This will fill read data
* size considering the bufferSize being passed. The read data will be kept in a buffer. When this
* is enabled, the pread API will read only the specified amount of data from the given offset and
* the buffer will not come into use at all.
* @see #read(long, byte[], int, int)
*/
private boolean enablePread;

// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
Expand Down Expand Up @@ -85,6 +93,7 @@ public AbfsInputStream(
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.enablePread = abfsInputStreamContext.isPreadEnabled();
this.eTag = eTag;
this.readAheadEnabled = true;
this.cachedSasToken = new CachedSASToken(
Expand All @@ -96,6 +105,28 @@ public String getPath() {
return path;
}

@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
if (!enablePread) {
return super.read(position, buffer, offset, length);
}
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
if (streamStatistics != null) {
streamStatistics.readOperationStarted(offset, length);
}
int bytesRead = readRemote(position, buffer, offset, length);
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
if (streamStatistics != null) {
streamStatistics.bytesRead(bytesRead);
}
return bytesRead;
}

@Override
public int read() throws IOException {
byte[] b = new byte[1];
Expand Down Expand Up @@ -503,6 +534,11 @@ public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}

@VisibleForTesting
void setEnablePread(boolean enablePread) {
this.enablePread = enablePread;
}

/**
* Get the statistics of the stream.
* @return a string value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private boolean preadEnabled;

private AbfsInputStreamStatistics streamStatistics;

public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
Expand All @@ -54,6 +56,11 @@ public AbfsInputStreamContext withTolerateOobAppends(
return this;
}

public AbfsInputStreamContext withPreadEnabled(final boolean preadEnabled) {
this.preadEnabled = preadEnabled;
return this;
}

public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
Expand All @@ -77,6 +84,10 @@ public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}

public boolean isPreadEnabled() {
return preadEnabled;
}

public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,80 @@ public void testReadAheadCounters() throws IOException, InterruptedException {
}
}

@Test
public void testPread() throws IOException {
describe("Testing preads in AbfsInputStream");

AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path readStatPath = path(getMethodName());

AbfsOutputStream out = null;
AbfsInputStream in = null;
boolean oldPreadEnabled = abfss.getAbfsConfiguration().isPreadEnabled();
try {
out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);
/*
* Writing 1MB buffer to the file.
*/
out.write(defBuffer);
out.hflush();

in = abfss.openFileForRead(readStatPath, fs.getFsStatistics());
/*
* Doing 10 bytes pread 10 times.
*/
int bytesPerRead = 10;
int pos = 0;
for (int i = 0; i < OPERATIONS; i++) {
in.read(pos, defBuffer, pos, bytesPerRead);
pos += bytesPerRead;
}
AbfsInputStreamStatisticsImpl stats = (AbfsInputStreamStatisticsImpl) in
.getStreamStatistics();
LOG.info("STATISTICS: {}", stats.toString());
/*
* bytesRead - Since each time 10 bytes are read, total bytes read would be equal to
* OPERATIONS * 10.
*
* readOps - Since each time read operation is performed OPERATIONS times, total number of
* read operations would be equal to OPERATIONS.
*
* remoteReadOps - Only a single remote read operation is done. Hence, total remote read ops
* is 1.
*/
assertEquals("Mismatch in bytesRead value", OPERATIONS * bytesPerRead, stats.getBytesRead());
assertEquals("Mismatch in readOps value", OPERATIONS, stats.getReadOperations());
assertEquals("Mismatch in remoteReadOps value", 1, stats.getRemoteReadOperations());
in.close();
// Verifying if stats are still readable after stream is closed.
LOG.info("STATISTICS after closing: {}", stats.toString());

// Now test with pread enabled.
abfss.getAbfsConfiguration().setEnablePread(true);
in = abfss.openFileForRead(readStatPath, fs.getFsStatistics());
pos = 0;
for (int i = 0; i < OPERATIONS; i++) {
in.read(pos, defBuffer, pos, bytesPerRead);
pos += bytesPerRead;
}
stats = (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
LOG.info("STATISTICS: {}", stats.toString());
/*
* remoteReadOps - Pread will do exactly those many bytes being asked for and no buffering. So
* there will be 10 remote reads.
*/
assertEquals("Mismatch in bytesRead value", OPERATIONS * bytesPerRead, stats.getBytesRead());
assertEquals("Mismatch in readOps value", OPERATIONS, stats.getReadOperations());
assertEquals("Mismatch in remoteReadOps value", OPERATIONS, stats.getRemoteReadOperations());
assertEquals("Mismatch in bytesReadFromBuffer value", 0, stats.getBytesReadFromBuffer());
in.close();
} finally {
abfss.getAbfsConfiguration().setEnablePread(oldPreadEnabled);
IOUtils.cleanupWithLogger(LOG, out, in);
}
}

/**
* Method to assert the initial values of the statistics.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.bouncycastle.util.Arrays;
import org.junit.Test;

public class ITestAbfsPread extends AbstractAbfsIntegrationTest {

public ITestAbfsPread() throws Exception {
}

@Test
public void testPread() throws IOException {
describe("Testing preads in AbfsInputStream");
Path dest = path("ITestAbfsPread");

int dataSize = 100;
byte[] data = ContractTestUtils.dataset(dataSize, 'a', 26);
ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length, dataSize, true);
int bytesToRead = 10;
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
assertTrue(
"unexpected stream type " + inputStream.getWrappedStream().getClass().getSimpleName(),
inputStream.getWrappedStream() instanceof AbfsInputStream);
byte[] readBuffer = new byte[bytesToRead];
int pos = 0;
assertEquals("AbfsInputStream#read did not read the correct number of bytes",
bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
assertTrue("AbfsInputStream#read did not read the correct bytes",
Arrays.areEqual(Arrays.copyOfRange(data, pos, pos + bytesToRead), readBuffer));
// Read only 10 bytes from offset 0. But by default it will do the seek and read where the
// entire 100 bytes get read into the AbfsInputStream buffer.
assertArrayEquals("AbfsInputStream#read did not read more data into its buffer", data,
Arrays.copyOfRange(((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0,
dataSize));
}
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
AbfsInputStream abfsIs = (AbfsInputStream) inputStream.getWrappedStream();
abfsIs.setEnablePread(true);
byte[] readBuffer = new byte[bytesToRead];
int pos = 10;
assertEquals("AbfsInputStream#read did not read the correct number of bytes",
bytesToRead, inputStream.read(pos, readBuffer, 0, bytesToRead));
assertTrue("AbfsInputStream#read did not read the correct bytes",
Arrays.areEqual(Arrays.copyOfRange(data, pos, pos + bytesToRead), readBuffer));
// Read only 10 bytes from offset 10. This time, as pread is enabled, it will only read the
// exact bytes as requested and no data will get read into the AbfsInputStream#buffer. Infact
// the buffer won't even get initialized.
assertNull("AbfsInputStream pread caused the internal buffer creation", abfsIs.getBuffer());
// Now make a seek and read so that internal buffer gets created
inputStream.seek(0);
inputStream.read(readBuffer);
// This read would have fetched all 100 bytes into internal buffer.
assertArrayEquals("AbfsInputStream#read did not read more data into its buffer", data,
Arrays.copyOfRange(((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0,
dataSize));
// Now again do pos read and make sure not any extra data being fetched.
resetBuffer(abfsIs.getBuffer());
pos = 0;
assertEquals("AbfsInputStream#read did not read the correct number of bytes", bytesToRead,
inputStream.read(pos, readBuffer, 0, bytesToRead));
assertTrue("AbfsInputStream#read did not read the correct bytes",
Arrays.areEqual(Arrays.copyOfRange(data, pos, pos + bytesToRead), readBuffer));
assertFalse("AbfsInputStream#read read more data into its buffer than expected",
Arrays.areEqual(data, Arrays.copyOfRange(abfsIs.getBuffer(), 0, dataSize)));
}
}

private void resetBuffer(byte[] buf) {
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) 0;
}
}
}