Skip to content

Commit 1ee18ee

Browse files
committed
HADOOP-18521. isolating read buffer invocations on stream for testing
This should now be set up for unit tests to simulate the failure conditions Change-Id: I25660e7067c95d0f3626d63e6f8f8156d2a1fc30 mkcsv enhancements
1 parent 14aa9ec commit 1ee18ee

File tree

5 files changed

+136
-63
lines changed

5 files changed

+136
-63
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@
5353

5454
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
5555
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
56+
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
57+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
5658
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
5759
import static org.apache.hadoop.util.StringUtils.toLowerCase;
5860

5961
/**
6062
* The AbfsInputStream for AbfsClient.
6163
*/
6264
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
63-
StreamCapabilities, IOStatisticsSource {
65+
StreamCapabilities, IOStatisticsSource, ReadBufferStreamOperations {
6466
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
6567
// Footer size is set to qualify for both ORC and parquet files
6668
public static final int FOOTER_SIZE = 16 * ONE_KB;
@@ -523,7 +525,12 @@ private int readInternal(final long position, final byte[] b, final int offset,
523525
}
524526
}
525527

526-
int readRemote(long position, byte[] b, int offset, int length, TracingContext tracingContext) throws IOException {
528+
@Override
529+
public int readRemote(long position,
530+
byte[] b,
531+
int offset,
532+
int length,
533+
TracingContext tracingContext) throws IOException {
527534
if (position < 0) {
528535
throw new IllegalArgumentException("attempting to read from negative offset");
529536
}
@@ -709,12 +716,7 @@ public synchronized void close() throws IOException {
709716
}
710717
}
711718

712-
/**
713-
* Is the stream closed?
714-
* This must be thread safe as prefetch operations in
715-
* different threads probe this before closure.
716-
* @return true if the stream has been closed.
717-
*/
719+
@Override
718720
@InterfaceAudience.Private
719721
public boolean isClosed() {
720722
return closed.get();
@@ -789,6 +791,7 @@ protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
789791
this.cachedSasToken = cachedSasToken;
790792
}
791793

794+
@Override
792795
@VisibleForTesting
793796
public String getStreamID() {
794797
return inputStreamId;
@@ -867,7 +870,7 @@ public String toString() {
867870
", bufferedPreadDisabled=" + bufferedPreadDisabled +
868871
", firstRead=" + firstRead +
869872
", fCursor=" + fCursor +
870-
", " + streamStatistics.toString() +
873+
", " + ioStatisticsToPrettyString(retrieveIOStatistics(streamStatistics)) +
871874
"}";
872875
}
873876

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@
2323

2424
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
2525
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
26+
import org.apache.hadoop.fs.statistics.DurationTracker;
2627
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
2728

2829
import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;
30+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS;
2931
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED;
3032
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED;
3133
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED;
34+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
3235

3336
class ReadBuffer {
3437

35-
private AbfsInputStream stream;
38+
/**
39+
* Stream operations.
40+
*/
41+
private ReadBufferStreamOperations stream;
3642
private long offset; // offset within the file for the buffer
3743
private int length; // actual length, set after the buffer is filles
3844
private int requestedLength; // requested length of the read
@@ -51,11 +57,11 @@ class ReadBuffer {
5157

5258
private IOException errException = null;
5359

54-
public AbfsInputStream getStream() {
60+
public ReadBufferStreamOperations getStream() {
5561
return stream;
5662
}
5763

58-
public void setStream(AbfsInputStream stream) {
64+
public void setStream(ReadBufferStreamOperations stream) {
5965
this.stream = stream;
6066
}
6167

@@ -212,7 +218,7 @@ public IOStatisticsStore getStreamIOStatistics() {
212218
* @param offset offset in buffer where copy began
213219
* @param bytesCopied bytes copied.
214220
*/
215-
public void dataConsumedByStream(int offset, int bytesCopied) {
221+
void dataConsumedByStream(int offset, int bytesCopied) {
216222
setAnyByteConsumed(true);
217223
if (offset == 0) {
218224
setFirstByteConsumed(true);
@@ -225,40 +231,11 @@ public void dataConsumedByStream(int offset, int bytesCopied) {
225231
iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied);
226232
}
227233

228-
229-
/**
230-
* The read completed
231-
* @param result read result
232-
* @param bytesActuallyRead the number of bytes actually read.
233-
* @param timestampMillis timestamp of completion
234-
*/
235-
void readCompleted(
236-
final ReadBufferStatus result,
237-
final int bytesActuallyRead,
238-
final long timestampMillis) {
239-
setStatus(status);
240-
if (status == ReadBufferStatus.AVAILABLE) {
241-
setLength(bytesActuallyRead);
242-
} else {
243-
setLength(0);
244-
}
245-
setTimeStamp(timestampMillis);
246-
}
247-
248-
/**
249-
* Has the read succeeded wth valid data
250-
* @return true if there is data from a successful read
251-
*/
252-
boolean readSucceededWithData() {
253-
return status == ReadBufferStatus.AVAILABLE
254-
&& getLength() > 0;
255-
}
256-
257234
/**
258235
* The (completed) buffer was evicted; update stream statistics
259236
* as appropriate.
260237
*/
261-
public void evicted() {
238+
void evicted() {
262239
if (getBufferindex() >= 0) {
263240
IOStatisticsStore iostats = getStreamIOStatistics();
264241
iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, 1);
@@ -273,4 +250,28 @@ void releaseBuffer() {
273250
setBuffer(null);
274251
setBufferindex(-1);
275252
}
253+
254+
255+
/**
256+
* Prefetch started -update stream statistics.
257+
*/
258+
void prefetchStarted() {
259+
getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
260+
}
261+
262+
/**
263+
* Prefetch started -update stream statistics.
264+
*/
265+
void prefetchFinished() {
266+
getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
267+
}
268+
269+
/**
270+
* Get a duration tracker for the prefetch.
271+
* @return a duration tracker.
272+
*/
273+
DurationTracker trackPrefetchOperation() {
274+
return getStreamIOStatistics().trackDuration(STREAM_READ_PREFETCH_OPERATIONS);
275+
}
276+
276277
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
3737
import org.apache.hadoop.classification.VisibleForTesting;
3838

39-
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS;
4039
import static org.apache.hadoop.util.Preconditions.checkState;
4140

4241
/**
@@ -144,7 +143,7 @@ private ReadBufferManager() {
144143
* @param requestedOffset The offset in the file which shoukd be read
145144
* @param requestedLength The length to read
146145
*/
147-
void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength,
146+
void queueReadAhead(final ReadBufferStreamOperations stream, final long requestedOffset, final int requestedLength,
148147
TracingContext tracingContext) {
149148
if (LOGGER.isTraceEnabled()) {
150149
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
@@ -341,18 +340,18 @@ private boolean evict(final ReadBuffer buf) {
341340
return true;
342341
}
343342

344-
private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) {
343+
private boolean isAlreadyQueued(final ReadBufferStreamOperations stream, final long requestedOffset) {
345344
// returns true if any part of the buffer is already queued
346345
return (isInList(readAheadQueue, stream, requestedOffset)
347346
|| isInList(inProgressList, stream, requestedOffset)
348347
|| isInList(completedReadList, stream, requestedOffset));
349348
}
350349

351-
private boolean isInList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) {
350+
private boolean isInList(final Collection<ReadBuffer> list, final ReadBufferStreamOperations stream, final long requestedOffset) {
352351
return (getFromList(list, stream, requestedOffset) != null);
353352
}
354353

355-
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) {
354+
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final ReadBufferStreamOperations stream, final long requestedOffset) {
356355
for (ReadBuffer buffer : list) {
357356
if (buffer.getStream() == stream) {
358357
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
@@ -374,7 +373,7 @@ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInpu
374373
* @param requestedOffset
375374
* @return
376375
*/
377-
private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
376+
private ReadBuffer getBufferFromCompletedQueue(final ReadBufferStreamOperations stream, final long requestedOffset) {
378377
for (ReadBuffer buffer : completedReadList) {
379378
// Buffer is returned if the requestedOffset is at or above buffer's
380379
// offset but less than buffer's length or the actual requestedLength
@@ -389,7 +388,7 @@ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, fin
389388
return null;
390389
}
391390

392-
private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
391+
private void clearFromReadAheadQueue(final ReadBufferStreamOperations stream, final long requestedOffset) {
393392
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
394393
if (buffer != null) {
395394
readAheadQueue.remove(buffer);
@@ -509,7 +508,7 @@ public synchronized void validateReadManagerState() {
509508

510509
}
511510

512-
private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
511+
private int getBlockFromCompletedQueue(final ReadBufferStreamOperations stream, final long position, final int length,
513512
final byte[] buffer) throws IOException {
514513
ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
515514

@@ -576,7 +575,7 @@ ReadBuffer getNextBlockToRead() throws InterruptedException {
576575
}
577576
validateReadManagerState();
578577
// update stream gauge.
579-
buffer.getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
578+
buffer.prefetchStarted();
580579
return buffer;
581580
}
582581

@@ -593,7 +592,7 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
593592
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead, buffer);
594593
}
595594
// decrement counter.
596-
buffer.getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
595+
buffer.prefetchFinished();
597596

598597
try {
599598
synchronized (this) {
@@ -612,15 +611,18 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
612611

613612
boolean shouldFreeBuffer = false;
614613
String freeBufferReason = "";
615-
buffer.readCompleted(result, bytesActuallyRead, currentTimeMillis());
616-
if (!buffer.readSucceededWithData()) {
617-
// reead failed or there was no data, -the buffer can be returned to the free list.
614+
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
615+
buffer.setStatus(ReadBufferStatus.AVAILABLE);
616+
buffer.setLength(bytesActuallyRead);
617+
} else {
618+
// read failed or there was no data, -the buffer can be returned to the free list.
618619
shouldFreeBuffer = true;
619620
freeBufferReason = "failed read";
620621
}
621-
622622
// completed list also contains FAILED read buffers
623623
// for sending exception message to clients.
624+
buffer.setStatus(result);
625+
buffer.setTimeStamp(currentTimeMillis());
624626
if (!buffer.isStreamClosed()) {
625627
// completed reads are added to the list.
626628
LOGGER.trace("Adding buffer to completed list {}", buffer);
@@ -708,7 +710,7 @@ void callTryEvict() {
708710
* still in use.
709711
* @param stream input stream.
710712
*/
711-
public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
713+
public synchronized void purgeBuffersForStream(ReadBufferStreamOperations stream) {
712714
LOGGER.debug("Purging stale buffers for AbfsInputStream {}/{}",
713715
stream.getStreamID(), stream.getPath());
714716

@@ -737,7 +739,7 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
737739
* @param list list of buffers like {@link this#completedReadList}
738740
* or {@link this#inProgressList}.
739741
*/
740-
private int purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
742+
private int purgeList(ReadBufferStreamOperations stream, LinkedList<ReadBuffer> list) {
741743
int purged = 0;
742744
for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
743745
ReadBuffer readBuffer = it.next();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
24+
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
25+
26+
/**
27+
* Interface which is required for read buffer stream
28+
* calls.
29+
* Extracted from {@code AbfsInputStream} to make testing
30+
* easier and to isolate what operations the read buffer
31+
* makes of the streams using it.
32+
*/
33+
interface ReadBufferStreamOperations {
34+
35+
/**
36+
* Read a block from the store.
37+
* @param position position in file
38+
* @param b destination buffer.
39+
* @param offset offset in buffer
40+
* @param length length of read
41+
* @param tracingContext trace context
42+
* @return count of bytes read.
43+
* @throws IOException failure.
44+
*/
45+
int readRemote(long position,
46+
byte[] b,
47+
int offset,
48+
int length,
49+
TracingContext tracingContext) throws IOException;
50+
51+
/**
52+
* Is the stream closed?
53+
* This must be thread safe as prefetch operations in
54+
* different threads probe this before closure.
55+
* @return true if the stream has been closed.
56+
*/
57+
boolean isClosed();
58+
59+
String getStreamID();
60+
61+
IOStatisticsStore getIOStatistics();
62+
63+
/**
64+
* Get the stream path as a string.
65+
* @return path string.
66+
*/
67+
String getPath();
68+
69+
70+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
2929
import org.apache.hadoop.fs.statistics.DurationTracker;
3030

31-
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
32-
3331
class ReadBufferWorker implements Runnable {
3432

3533
private static final Logger LOGGER =
@@ -71,8 +69,7 @@ public void run() {
7169
}
7270
if (buffer != null) {
7371
// input stream is updated with count/duration of prefetching
74-
DurationTracker tracker =
75-
buffer.getStreamIOStatistics().trackDuration(STREAM_READ_PREFETCH_OPERATIONS);
72+
DurationTracker tracker = buffer.trackPrefetchOperation();
7673
try {
7774
// do the actual read, from the file.
7875
LOGGER.trace("Reading {}", buffer);

0 commit comments

Comments
 (0)