-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers #5117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers #5117
Conversation
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
going to have to rebase this. i also intend to have prefetch threads update the input streams with
reader thread jsut gets iostats off the stream, casts to IOStatisticsStore then uses it as a duration tracker and gauge stats source; straightforward |
ddd1a32
to
c295fb2
Compare
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran , thank you for the PR. Have reviewed the PR with some comments. Thanks again.
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); | ||
purgeList(stream, completedReadList); | ||
purgeList(stream, inProgressList); | ||
int readaheadPurged = readAheadQueue.size() - before; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the thread reaches this line, maybe some more blocks would be added in readAheadQueue, this may bloat the metric. Also, before should >= readAheadQueue.size() (in case no additional blocks are ahead), this would result in negative addition.
freeList.push(buffer.getBufferindex()); | ||
// buffer will be deleted as per the eviction policy. | ||
// there is no data, so it is immediately returned to the free list. | ||
placeBufferOnFreeList("failed read", buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may result in IllegalStateException propogating to AbfsInputStream.
This line will add the buffer into freeList, from which this index shall be taken by readBuffer b1.
Now, after sometime, let this buffer from completedList needs to be evicted, it would come to https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L408, two things can happen:
- freeList still has this index: it will throw IllegalStateException
- freeList doesn't have: it will throw IllegalStateException from https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L411.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for the same: saxenapranav@18da375.
In seperate run:
java.lang.IllegalStateException: Buffer 14 returned to free buffer list by non-owner ReadBuffer{status=AVAILABLE, offset=4194304, length=0, requestedLength=4194304, bufferindex=14, timeStamp=46807492, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=org.apache.hadoop.fs.PathIOException: `/testfilef6b6f93ac245': Input/output error: Buffer index 14 found in buffer collection completedReadList, stream=org.apache.hadoop.fs.azurebfs.services.AbfsInputStream@652e2419{counters=((stream_read_bytes_backwards_on_seek=0) (stream_read_operations=1) (remote_read_op=2) (stream_read_seek_backward_operations=0) (action_http_get_request.failures=0) (action_http_get_request=0) (bytes_read_buffer=0) (stream_read_bytes=0) (seek_in_buffer=0) (remote_bytes_read=0) (stream_read_seek_bytes_skipped=0) (stream_read_seek_operations=2) (read_ahead_bytes_read=0) (stream_read_seek_forward_operations=2));
gauges=();
minimums=((action_http_get_request.failures.min=-1) (action_http_get_request.min=-1));
maximums=((action_http_get_request.max=-1) (action_http_get_request.failures.max=-1));
means=((action_http_get_request.failures.mean=(samples=0, sum=0, mean=0.0000)) (action_http_get_request.mean=(samples=0, sum=0, mean=0.0000)));
}AbfsInputStream@(1697522713){StreamStatistics{counters=((remote_bytes_read=0) (stream_read_seek_backward_operations=0) (remote_read_op=2) (stream_read_seek_forward_operations=2) (bytes_read_buffer=0) (seek_in_buffer=0) (stream_read_bytes=0) (stream_read_operations=1) (read_ahead_bytes_read=0) (stream_read_bytes_backwards_on_seek=0) (stream_read_seek_operations=2) (action_http_get_request.failures=0) (stream_read_seek_bytes_skipped=0) (action_http_get_request=0));
gauges=();
minimums=((action_http_get_request.min=-1) (action_http_get_request.failures.min=-1));
maximums=((action_http_get_request.failures.max=-1) (action_http_get_request.max=-1));
means=((action_http_get_request.mean=(samples=0, sum=0, mean=0.0000)) (action_http_get_request.failures.mean=(samples=0, sum=0, mean=0.0000)));
}}}
at org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyReadOwnsBufferAtIndex(ReadBufferManager.java:430)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:411)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a suggestive-change, which prevents this:
saxenapranav@0d09a0d
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi. Thanks for the changes. We might need a discussion if we should add readBuffer in completedList, since it seems, it can lead to some inconsistency issue. Have felt an inconsistency issue and have given an explanation for the same. Thanks a lot again. Regards.
if (!buffer.isStreamClosed()) { | ||
// completed reads are added to the list. | ||
LOGGER.trace("Adding buffer to completed list {}", buffer); | ||
completedReadList.add(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets not add buffer in completedList in the cases where we are going to add in freeList(due to byteRead == 0).
- Got exception:
2022-11-10 20:48:22,516 TRACE [ABFS-prefetch-7]: services.ReadBufferManager (ReadBufferManager.java:doneReading(591)) - ReadBufferWorker completed file /testfilefb393e327a88 for offset 4194304 bytes 0; org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=READING_IN_PROGRESS, offset=4194304, length=0, requestedLength=4194304, bufferindex=0, timeStamp=0, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream closed=false}
2022-11-10 20:48:22,516 TRACE [ABFS-prefetch-7]: services.ReadBufferManager (ReadBufferManager.java:doneReading(633)) - Adding buffer to completed list org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=AVAILABLE, offset=4194304, length=0, requestedLength=4194304, bufferindex=0, timeStamp=86052487, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream closed=false}
2022-11-10 20:48:22,516 DEBUG [ABFS-prefetch-7]: services.ReadBufferManager (ReadBufferManager.java:placeBufferOnFreeList(407)) - Returning buffer index 0 to free list for 'failed read'; owner org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=AVAILABLE, offset=4194304, length=0, requestedLength=4194304, bufferindex=0, timeStamp=86052487, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream closed=false}
2022-11-10 20:48:22,517 TRACE [ABFS-prefetch-7]: services.ReadBufferWorker (ReadBufferWorker.java:run(95)) - Exception received:
org.apache.hadoop.fs.PathIOException: `/testfilefb393e327a88': Input/output error: Buffer index 0 found in buffer collection completedReadList
at org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:93)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: Buffer index 0 found in buffer collection completedReadList
at org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyByteBufferNotInCollection(ReadBufferManager.java:471)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyByteBufferNotInUse(ReadBufferManager.java:457)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:413)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.doneReading(ReadBufferManager.java:646)
at org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:87)
... 1 more
Reason: https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L629 -> https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L641 -> https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L413 -> https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L457 -> exception.
- Double addition in freeList:
-- Let in doneReading, prefetch-thread reaches https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L415 and somehow contextSwitch happens and this thread doesn't get CPU for some time. Meanwhile, the buffer added in completedList gets picked up for eviction and its run and added in freeList. Now, the prefetchThread gets CPU again and runs https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L415-L422 and adds in freeList.
-- Wrote an experiment for the same: saxenapranav@7b6ac15
2022-11-10 22:22:57,147 TRACE [Thread-28]: services.ReadBufferManager (ReadBufferManager.java:lambda$init$0(147)) - INCONSISTENCY!! on index 8
Exception in thread "Thread-16" java.lang.AssertionError: At index4194304
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.assertTrue(Assert.java:42)
at org.junit.Assert.assertFalse(Assert.java:65)
at org.apache.hadoop.fs.azurebfs.ITestPartialRead.lambda$purgeIssue$0(ITestPartialRead.java:154)
at java.lang.Thread.run(Thread.java:750)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if we do placeBufferOnFreeList before trying to add in completedList, since, we would add it in freeList before completeListAddition (which can try itself to add in freeList on evict()) -> this will force never addition to freeList from completedList.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with this because the current flow might lead to double addition in free list or inconsistency during addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making some changes. this is a complex bit of code and why I plan to write some unit tests to explore the results; i will take what you've done too @pranavsaxena-microsoft
// buffer will be deleted as per the eviction policy. | ||
// read failed or there was no data, -the buffer can be returned to the free list. | ||
shouldFreeBuffer = true; | ||
freeBufferReason = "failed read"; | ||
} | ||
// completed list also contains FAILED read buffers | ||
// for sending exception message to clients. | ||
buffer.setStatus(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of READ_FAILED, it will make buffer.bufferIndex = -1. Now, when it goes to placeBufferOnFreeList, at https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L406, it will be index = -1 -> it will break the flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i've moved that check down. and with the move to an interface for abfs interaction, now in a position to create a test to simulate the conditions, including "available but empty" and "io error" on normal and prefetch reads
if (!buffer.isStreamClosed()) { | ||
// completed reads are added to the list. | ||
LOGGER.trace("Adding buffer to completed list {}", buffer); | ||
completedReadList.add(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if we do placeBufferOnFreeList before trying to add in completedList, since, we would add it in freeList before completeListAddition (which can try itself to add in freeList on evict()) -> this will force never addition to freeList from completedList.
The checkstyle errors needs fixing. |
buf.setTracingContext(null); | ||
if (LOGGER.isTraceEnabled()) { | ||
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", | ||
buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); | ||
} | ||
completedReadList.remove(buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer should be removed from completed list after it has been added to the free list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. i'd positioned where they were so the invariant "not in use" held.
maybe the validateReadManagerState() should go in at the end of the eviction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please highlight again why should we not remove the buffer from the completed list after it has been added to the free list ?
thx for the comments; been away from this on the doc/replication side of this "issue" all week. have been able to replicate the problem with avro parsing, though there it always fails at the parser; on csv/text records that's not guaranteed |
Hi @steveloughran, Wanted to get your opinion on below change as a possible replacement for this change : A ReadBuffer with a valid Buffer assigned to it can be in certain states when stream is closed, and with the above change, I am trying to address it as below :
Now, when in state 3 or 4, the purge method might not pick it as it might have executed first. In that case, to prioritize these ReadBuffers for eviction, have added the check for stream is closed in the eviction code as well. Please let me know if you see value in this fix and I could pursue further changes to incorporate validation code at queuing time and when getBlock finds a hit in completed list, and will also add related test code. |
see also #5134 which is the "disable readahead" patch for 3.3.5 |
1ee18ee
to
5415ba3
Compare
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
* @return path string. | ||
*/ | ||
String getPath(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Additional line, can be removed
@@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t | |||
throw new FileNotFoundException(ere.getMessage()); | |||
} | |||
} | |||
throw new IOException(ex); | |||
throw ex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason for changing the exception type from IOException to AzureBlobFileSystemException ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i haven't
class AzureBlobFileSystemException extends IOException
just removed one layer of wrapping so hive is less likely to lose the stack trace
setBufferindex(-1); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Extra line
.describedAs("in progress blocks discarded") | ||
.isGreaterThan(initialInProgressBlocksDiscarded); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add line at the end of the file.
@@ -26,6 +26,8 @@ log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG | |||
log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG | |||
log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE | |||
log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG | |||
log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this added for testing as this might add a lot of logging ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was...but we run the rest of the tests logging at debug, and i don't see the prefetcher being any chattier than the rest of the stack.
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
This is the roll-up of my work on prefetching rebased onto trunk and all merge conflicts addressed HADOOP-18521. ABFS ReadBufferManager does not reuse in-progress buffers Addresses the issue by not trying to cancel in-progress reads when a stream is closed()...they are allowed to continue and then their data discarded. To enable discarding, AbfsInputStreams export their `closed` state in which is now AtomicBool internally so reader threads can probe it. The shared buffers now have owner tracking, which will reject * attempts to acquire an owned buffer * attempts to return a buffer not owned Plus * Lots of other invariants added to validate the state * useful to string values HADOOP-18521. ABFS ReadBufferManager does not reuse in-progress buffers Adds path stream capability probe for the bug abfs, which you can demand in an openFile() call. That will block your code ever working on a version without the race condition HADOOP-18521. prune map of buffer to reader as the array does it HADOOP-18521. stats collection and use in itest HADOOP-18521. isolating read buffer invocations on stream for testing This should now be set up for unit tests to simulate the failure conditions HADOOP-18521. cut a check for freeing buffer while still in progress HADOOP-18521. abfs ReadBufferManager and closed streams * working on the tests * during step-through debugging identified where the abfs input stream needs to be hardened against unbuffer/close invoked HADOOP-18521. improve completed read eviction in close -always call ReadBuffer.evict(), which adds stream stats on whether a block was used before it was evicted. This helps assess the value of prefetching HADOOP-18521. testing more of the failure conditions HADOOP-18521. Unit tests of ReadBufferManager logic. Now its possible to have tests which yetus can run on the details of fetching and error handling HADOOP-18521. ReadBufferManager evict() test/tweak closed read buffers with data are always found, even if somehow they didn't get purged (is this possible? the synchronized blocks say otherwise). closed read buffers without data (failure buffers) are silently discarded HADOOP-18521. review comments about newlines HADOOP-18521. checkstyles...mostly + symbols on generated toString()
a261c9a
to
940c9b6
Compare
💔 -1 overall
This message was automatically generated. |
converting to a WiP as this is not targeting 3.3.5 |
not working on this; |
Addresses the issue by not trying to cancel in-progress reads when a stream
is closed()...they are allowed to continue and then their data discarded.
To enable discarding, AbfsInputStreams export their
closed
state inwhich is now AtomicBool internally so reader threads can probe it.
The shared buffers now have owner tracking, which will reject
Plus
Also adds path and stream capability probe for the fix;
cloudstore "pathcapability" probe can report this.
Hadoop 3.3.2 added the path capability
"fs.capability.paths.acls", so two probes can
determine if abfs is exposed:
not vulnerable
vulnerable
It can also be demanded in an openFile() call.
That block the code ever working on a version without
the race condition. Possibly a bit excessive.
How was this patch tested?
needs more tests with multi GB csv files to validate the patch.
Unable to come up with good tests to recreate the failure condition.
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?