-
Notifications
You must be signed in to change notification settings - Fork 985
DRILL-5512: Standardize error handling in ScanBatch #838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,31 +86,32 @@ public class ScanBatch implements CloseableRecordBatch { | |
|
|
||
| public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, | ||
| OperatorContext oContext, Iterator<RecordReader> readers, | ||
| List<Map<String, String>> implicitColumns) throws ExecutionSetupException { | ||
| List<Map<String, String>> implicitColumns) { | ||
| this.context = context; | ||
| this.readers = readers; | ||
| if (!readers.hasNext()) { | ||
| throw new ExecutionSetupException("A scan batch must contain at least one reader."); | ||
| throw UserException.systemError( | ||
| new ExecutionSetupException("A scan batch must contain at least one reader.")) | ||
| .build(logger); | ||
| } | ||
| currentReader = readers.next(); | ||
| this.oContext = oContext; | ||
| allocator = oContext.getAllocator(); | ||
| mutator = new Mutator(oContext, allocator, container); | ||
|
|
||
| boolean setup = false; | ||
| try { | ||
| oContext.getStats().startProcessing(); | ||
| currentReader.setup(oContext, mutator); | ||
| setup = true; | ||
| } finally { | ||
| // if we had an exception during setup, make sure to release existing data. | ||
| if (!setup) { | ||
| try { | ||
| currentReader.close(); | ||
| } catch(final Exception e) { | ||
| throw new ExecutionSetupException(e); | ||
| } | ||
| } catch (ExecutionSetupException e) { | ||
| try { | ||
| currentReader.close(); | ||
| } catch(final Exception e2) { | ||
| logger.error("Close failed for reader " + currentReader.getClass().getSimpleName(), e2); | ||
| } | ||
| throw UserException.systemError(e) | ||
| .addContext("Setup failed for", currentReader.getClass().getSimpleName()) | ||
| .build(logger); | ||
| } finally { | ||
| oContext.getStats().stopProcessing(); | ||
| } | ||
| this.implicitColumns = implicitColumns.iterator(); | ||
|
|
@@ -173,9 +174,8 @@ public IterOutcome next() { | |
|
|
||
| currentReader.allocate(mutator.fieldVectorMap()); | ||
| } catch (OutOfMemoryException e) { | ||
| logger.debug("Caught Out of Memory Exception", e); | ||
| clearFieldVectorMap(); | ||
| return IterOutcome.OUT_OF_MEMORY; | ||
| throw UserException.memoryError(e).build(logger); | ||
| } | ||
| while ((recordCount = currentReader.next()) == 0) { | ||
| try { | ||
|
|
@@ -213,17 +213,16 @@ public IterOutcome next() { | |
| try { | ||
| currentReader.allocate(mutator.fieldVectorMap()); | ||
| } catch (OutOfMemoryException e) { | ||
| logger.debug("Caught OutOfMemoryException"); | ||
| clearFieldVectorMap(); | ||
| return IterOutcome.OUT_OF_MEMORY; | ||
| throw UserException.memoryError(e).build(logger); | ||
| } | ||
| addImplicitVectors(); | ||
| } catch (ExecutionSetupException e) { | ||
| this.context.fail(e); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This call triggers query failure (stopping the fragment, notifying the Foreman, and cancelling other fragments, etc.). What is the flow after this change? Similar changes below.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throwing an exception, it turns out, does exactly the same: it cancels the query and causes the fragment executor to cascade close() calls to all the operators (record batches) in the fragment tree. It seems some code kills the query by throwing an exception, other code calls the fail method and bubbles up STOP. But, since the proper way to handle STOP is to unwind the stack, STOP is equivalent to throwing an exception. The idea is, rather than have two ways to clean up, let's standardize on one. Since we must handle unchecked exceptions in any case, the exception-based solution is the logical choice for standardization.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. |
||
| releaseAssets(); | ||
| return IterOutcome.STOP; | ||
| throw UserException.systemError(e).build(logger); | ||
| } | ||
| } | ||
|
|
||
| // At this point, the current reader has read 1 or more rows. | ||
|
|
||
| hasReadNonEmptyFile = true; | ||
|
|
@@ -245,18 +244,15 @@ public IterOutcome next() { | |
| return IterOutcome.OK; | ||
| } | ||
| } catch (OutOfMemoryException ex) { | ||
| context.fail(UserException.memoryError(ex).build(logger)); | ||
| return IterOutcome.STOP; | ||
| throw UserException.memoryError(ex).build(logger); | ||
| } catch (Exception ex) { | ||
| logger.debug("Failed to read the batch. Stopping...", ex); | ||
| context.fail(ex); | ||
| return IterOutcome.STOP; | ||
| throw UserException.systemError(ex).build(logger); | ||
| } finally { | ||
| oContext.getStats().stopProcessing(); | ||
| } | ||
| } | ||
|
|
||
| private void addImplicitVectors() throws ExecutionSetupException { | ||
| private void addImplicitVectors() { | ||
| try { | ||
| if (implicitVectors != null) { | ||
| for (ValueVector v : implicitVectors.values()) { | ||
|
|
@@ -274,7 +270,10 @@ private void addImplicitVectors() throws ExecutionSetupException { | |
| } | ||
| } | ||
| } catch(SchemaChangeException e) { | ||
| throw new ExecutionSetupException(e); | ||
| // No exception should be thrown here. | ||
| throw UserException.systemError(e) | ||
| .addContext("Failure while allocating implicit vectors") | ||
| .build(logger); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -324,7 +323,7 @@ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { | |
| * this scan batch. Made visible so that tests can create this mutator | ||
| * without also needing a ScanBatch instance. (This class is really independent | ||
| * of the ScanBatch, but resides here for historical reasons. This is, | ||
| * in turn, the only use of the genereated vector readers in the vector | ||
| * in turn, the only use of the generated vector readers in the vector | ||
| * package.) | ||
| */ | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The non-managed external sort spills to disk in case it receives this outcome. I do not know if there are other operators that handle this outcome. Are all the pre-requisite changes (to handle this change) already committed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it turns out, the idea of the OUT_OF_MEMORY return code works better in theory than in practice. No reader correctly handles this case. Let's say we have three columns (a, b, c). Let say that column c needs to double its vector, but hits OOM. No reader has the internal state needed to hold onto the value for c, unwind the call stack, then on the next next() call, rewind back to the point of writing c into the in-flight row.
Moving forward, we want to take a broader approach to memory: budget sufficient memory that readers can work. Modify the mutators so that they enforce batch size limits so that the reader operates within its budget.
As we move to that approach, the OUT_OF_MEMORY status will be retired.
The JIRA mentions another JIRA that holds a spec for all this stuff; something we discussed six months ago, but did not have time to implement then.
This all merits a complete discussion; maybe we can discuss the overall approach in that other JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
Asking the question a different way.. To avoid regressions, should the pre-requisite changes (DRILL-5211 and pertinent tasks) be committed before this patch? Or since the readers do not correctly handle the case anyway, there will be no difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. Yes, since the
FragmentExecutoralready handles errors and unwinds, we just exploit that (existing, working) path instead of the (also existing, but harder-to-keep working) path offail/STOP.The (managed) external sort, for example, reports all its errors via exceptions; it does not use
fail/STOP. The fragment executor recovers just fine.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this specific line change is required, so please correct me if I am wrong. Thinking out loud..
There are three places in ScanBatch where OutOfMemoryException is handled. Since OutOfMemoryException is an unchecked exception, I could not quickly find all the calls which trigger the exception in this method.
The first case and second case are similar in that
reader.allocate(...)fails. So although there is no unwind logic, seems to me, this case is correctly handled as no records have been read, and so there is no need to unwind. Say this triggers spilling in sort, then the query could complete successfully, if allocate succeeds next time (and so on). Am I following this logic correctly?But this does not seems to be case, as TestOutOfMemoryOutcome triggers an OutOfMemoryException during "next" allocation, and all tests are expected to fail.
And then, there is the third case, which is a general catch (e.g.
reader.next()throws OutOfMemoryException). And as you mentioned, readers cannot unwind, so that correctly fails the fragment.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You raise two good points.
The first is that Drill tends to use unchecked exceptions because, well, they are convenient and Drill must handle such unchecked exceptions as NPE, illegal state, illegal argument and so on.
But, normal Java practice is to declare exceptions so that callers know what they should handle. See DRILL-5386 asks to rationalize exception use. Comments in that ticket from project veterans shows some degree of resistance to the idea of checked exceptions. So, yes we must expect any unchecked exception from any method. This is why operators should handle all exceptions, and why we need code to sort out exceptions based on type.
The analysis of OOM is correct, but omits context. It is seldom (never?) that a sort sits directly above a scan. Seems most often there is a filter or project between them. If the scan hits OOM, it is not likely because it has exhausted the memory limit on the scan operator: each operator defaults to 10 GB limit. Instead, it is likely that overall system memory is exhausted. So, what is likely to happen? Each operator between the scan and sort must handle the OUT_OF_MEMORY status by bubbling it up the call stack. Let's assume that works.
The sort now wants to spill. Spilling is an activity that requires memory to perform. Spilling requires a merge phase to combine the records from buffered batches in sort order so that the spilled run is sorted. That is, the sort must allocate a batch, often many MB in size. (Spilled runs must be sizable so we can limit the number of spilled runs merged in the final merge phase.)
So, the sort tries to allocate vectors for the merge batch and... The allocation fails. Why? Because we are out of system memory -- that's why the scanner triggered an OOM.
I can find no code that sets up this out-of-system-memory condition to verify that existing code works. I think we were taking it on faith that this behavior actually works.
Moving forward, we are working towards a workable solution. Assign the scan some amount of memory, and limit batches to fit within that memory. Give the sort a certain amount of memory, and have it manage within that memory so that when a spill occurs, the sort has sufficient memory to create the required merge batches as part of the spill.
Finally, let's consider the case you outlined: the scan fails with OOM on the initial allocation. The initial allocation is often small; the scan goes through many vector doublings to read the full complement of rows. (At least, thats what I've observed empirically; perhaps the original intent was different.) Let's say we tried to recover from an initial allocation failure.
Say we have a row with five columns. We allocate three, but fail on the fourth. Say the fourth is a Varchar: has two vectors: offset and data. The current logic releases the partially-allocated vectors, which is good. OUT_OF_MEMORY is returned and the vectors are reallocated if memory could be released. Sounds good.
But, most queries run in multiple threads. If one hits OOM, then the others probably will as well. The actual system memory is a shared resource, but there is no coordination. A scan might release its partially-allocated vectors so the sort can, in theory, spill. But, that memory might immediately be grabbed by some other thread, resulting in a sort spill OOM. In practice, however, the initial vector allocations are much smaller than the merge batch, so it is only slightly useful to free up the initial allocation. That initial allocation, plus luck that some other thread has freed enough memory, might allow us to spill. But it is a crap-shoot.
In short, even if this logic might possibly work in some scenarios in a single-threaded query, it is too chaotic to work in general with many threads. And, of course no tests exist for either case so we are just guessing.
All-in-all, the above argues strongly that the path forward is to:
This checkin is a step toward goal 1. The external sort revision, hash agg spilling and other projects are steps toward goal 2. We continue to chip away at our ability to do goal 3.
Given all of this, can you suggest how we could gather evidence that the current OUT_OF_MEMORY status is actually working in any actual queries? Or, do we have a tough case of comparing concrete changes against an aspiration for how the system work might?
More practically, with the change, OOM will fail the query. Previously, there is some chance that Drill might recover from an OOM. But, we have no tests and no statistics. Is it worth risking that hypothetical for a concrete step in the right direction. I don't think we have the answer and that, itself, is a problem. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I was hoping there are tests to show current handling of OUT_OF_MEMORY as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it turns out, in addition to fixing ScanBatch, there is a need for a parallel implementation to support a the size-aware vector "writer". That new version was modified to allow extensive unit testing of all error conditions. Can't do that as easily with ScanBatch itself because it has messy references to other parts of Drill, which have references, which have references... Pretty soon all of Drill is needed to do the tests, which means using the current injection framework (which we could do.)