-
Notifications
You must be signed in to change notification settings - Fork 27
Fetch columnar results incrementally/lazily #966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fetch columnar results incrementally/lazily #966
Conversation
This reverts commit 76e36e3.
| getResultSetResp( | ||
| response.getStatus(), | ||
| response.getOperationHandle(), | ||
| response.toString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when the driver is run in debug log mode, stringifying thrift model classes incur a huge memory allocation cost (~700 MB to ~170 MB improvement). henceforth, removing these in the favour of telemetry and other statement id logging
| if (operationState == TOperationState.FINISHED_STATE) { | ||
| long fetchStartTime = System.nanoTime(); | ||
| resultSet = | ||
| getResultSetResp(response.getStatus(), operationHandle, response.toString(), -1, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
| String errorMessage = | ||
| String.format( | ||
| "Error while fetching results from Thrift server. Request {%s}, Error {%s}", | ||
| request.toString(), e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging request may still be useful without affecting memory
|
|
||
| // Move to next row in current batch | ||
| currentBatchIndex++; | ||
| globalRowIndex++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is where it becomes 0 for first time. So, client would do next() before consuming the row, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct. next() moves the cursor to the valid row starting from -1
| checkIfClosed(); | ||
| if (executionResult instanceof LazyThriftResult) { | ||
| return !executionResult.hasNext(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to check instanceof? can we not simply not do !hasNext() for all implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not simply not do !hasNext() for all implementations?
i was a bit hesitant to do this for all execution results so kept the original behaviour as is for others.
| LOGGER.info("Processing result of format {} from Thrift server", resultFormat); | ||
| switch (resultFormat) { | ||
| case COLUMN_BASED_SET: | ||
| return getResultSet(convertColumnarToRowBased(resultsResp, parentStatement, session)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not remove these older function implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function getResultSet(List<List> rows) is currently being used elsewhere too so keeping it for now.
| @Override | ||
| public long getRowCount() { | ||
| // Return the number of rows in the current batch | ||
| return currentBatch != null ? currentBatch.size() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this incorrect? other implementations return the total row count (from metadata, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right. This is different from other execution results in the sense that it does not return the whole row count. I have documented this limitation here in the usage (only one usage in DatabricksResultSet) of this method in the case of LazyThriftResult. From this execution result perspective, it only maintains a certain row count at a time and this method returns the row count maintained at this point. I am a bit hesitant to throw exception here.
If the limitation turns out to be a pain point for customer (i don't expect customers use isAfterLast() that often), the fix will be to send all fetch requests just to get the row count.
| @Override | ||
| public long getChunkCount() { | ||
| // For thrift columnar results, we don't have chunks in the same sense as Arrow | ||
| return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we instead throw an error since this is an internal interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in telemetry here. I have kept it as 0 because InlineJsonResult does the same. I will discuss this with @samikshya-db if we should throw error here and create a separate PR.
| "Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched); | ||
| } | ||
|
|
||
| private void fetchNextBatch() throws DatabricksSQLException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add javadoc across all functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| /** | ||
| * {@inheritDoc} | ||
| * | ||
| * <p><b>Limitation:</b> For lazy-loaded result sets ({@link LazyThriftResult}), particularly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, does simba also have same limitation, i.e. comparing current row with lazily fetched result
| boolean hasRowLimit = maxRows > 0; | ||
| if (hasRowLimit && globalRowIndex + 1 >= maxRows) { | ||
| hasReachedEnd = true; | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're already checking this in hasNext. can remove this block ig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of reasons this is like this:
- this function updates the state of hasReachedEnd
- hasNext and next are mostly independent and there is no strict enforcement on client code on how they use them together. So, for safety/early-exits, you may find some duplicate checks
| public boolean isLast() throws SQLException { | ||
| checkIfClosed(); | ||
| if (executionResult instanceof LazyThriftResult) { | ||
| return !executionResult.hasNext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isLast should return true only when we are on last row, not when there are no more rows
| return !executionResult.hasNext(); | |
| return executionResult.getCurrentRow() >= 0 && ! executionResult.hasNext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true only when we are on last row
isn't !executionResult.hasNext() doing the same thing? or are you referring to cases like empty result set, etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, accepted the suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors.
- Replaces eager loading with lazy fetching for thrift columnar results using
LazyThriftResult - Implements incremental data fetching controlled by
RowsFetchedPerBlockparameter - Reduces memory spikes by fetching data on-demand as the client reads rows
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
LazyThriftResult.java |
New class implementing lazy loading result set with incremental batch fetching |
ExecutionResultFactory.java |
Updated to use LazyThriftResult instead of converting all columnar data upfront |
DatabricksResultSet.java |
Added documentation and special handling for lazy result sets in cursor position methods |
DatabricksThriftAccessor.java |
Improved error logging by removing verbose toString() calls and focusing on relevant data |
LazyThriftResultTest.java |
Comprehensive test suite covering lazy fetching scenarios, error handling, and edge cases |
ExecutionResultFactoryTest.java |
Updated test assertion to expect LazyThriftResult instead of InlineJsonResult |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| this.currentResponse = initialResponse; | ||
| this.statement = statement; | ||
| this.session = session; | ||
| this.maxRows = statement != null ? statement.getMaxRows() : DEFAULT_RESULT_ROW_LIMIT; |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback to DEFAULT_RESULT_ROW_LIMIT when statement is null may cause unexpected behavior. If statement is null, maxRows should likely be 0 (unlimited) rather than a default limit, as this could artificially restrict results when no limit was intended.
| this.maxRows = statement != null ? statement.getMaxRows() : DEFAULT_RESULT_ROW_LIMIT; | |
| this.maxRows = statement != null ? statement.getMaxRows() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DEFAULT_RESULT_ROW_LIMIT is set to 0
|
|
||
| // Check if we've reached the maxRows limit | ||
| boolean hasRowLimit = maxRows > 0; | ||
| if (hasRowLimit && globalRowIndex + 1 >= maxRows) { |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition hasRowLimit && globalRowIndex + 1 >= maxRows is duplicated in both next() and hasNext() methods. Consider extracting this logic into a private method like hasReachedMaxRowsLimit() to improve maintainability and reduce duplication.
|
|
||
| // Check maxRows limit | ||
| boolean hasRowLimit = maxRows > 0; | ||
| if (hasRowLimit && globalRowIndex + 1 >= maxRows) { |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition hasRowLimit && globalRowIndex + 1 >= maxRows is duplicated in both next() and hasNext() methods. Consider extracting this logic into a private method like hasReachedMaxRowsLimit() to improve maintainability and reduce duplication.
| // Second column - with null | ||
| TColumn col2 = new TColumn(); | ||
| TStringColumn stringCol2 = new TStringColumn(); | ||
| stringCol2.setValues(Arrays.asList("placeholder")); // Actual value doesn't matter |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment 'Actual value doesn't matter' is misleading since the placeholder value could affect test behavior if the null handling logic changes. Consider using a more explicit value like 'NULL_PLACEHOLDER' or remove the comment entirely.
| stringCol2.setValues(Arrays.asList("placeholder")); // Actual value doesn't matter | |
| stringCol2.setValues(Arrays.asList("NULL_PLACEHOLDER")); // Placeholder value for null |
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> This PR contains changes from the PR #966 as well. Introduce ColumnarRowView to provide direct access to columnar data without materialising entire result sets into row objects. This reduces memory allocations by allowing individual cell access via `getValue(row, col)` instead of creating `List<List<Object>>` structures. Key changes: - New ColumnarRowView class with direct columnar access methods. - Updated LazyThriftResult to use columnar view instead of materialized rows. - Added utility method in DatabricksThriftUtil for creating columnar views. - Comprehensive test coverage for all column types and null handling. This optimization maintains API compatibility while significantly reducing memory overhead for large result sets. Following the changes introduced in PR #966, the following improvements were observed during a test that executes a SQL query retrieving 5 million rows: Current heap usage over time: <img width="720" height="429" alt="image" src="https://github.com/user-attachments/assets/ac87eae3-63aa-4529-9995-6392af6d4a3b" /> Improved heap usage over time: <img width="720" height="288" alt="image" src="https://github.com/user-attachments/assets/befff1f2-a54f-4609-aa81-a6311a329312" /> - The first image shows multiple significant CPU usage spikes reaching around 15%, while the second image shows consistently flat CPU usage at approximately 3%. - The erratic CPU behavior in the "before" state has been completely smoothed out. - CPU usage is now consistent and predictable rather than volatile. - Memory usage dropped from a peak of 8440 MB down to just 745 MB - that's roughly an 91% decrease. - The large memory spike and sustained high usage in the first image has been completely resolved. - Memory usage is now consistently low and stable throughout the monitoring period. - There is no negative effect on execution time; in fact, it appears to improve, likely due to more active GC pauses in the previous state. ## Testing <!-- Describe how the changes have been tested--> - Unit tests - e2e tests - FakeService tests ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
Description
when JDBC is used with Arrow disabled and operating in Thrift mode (the default mode or when useThriftClient=1), the Hive ThriftServer returns results in a columnar binary format (non-Arrow). Currently, before constructing the result set, JDBC fetches all the required data at once and buffers the entire result set in memory.
This PR modifies this behavior for the disabled Arrow and Thrift mode case, so that the result set maintains only a limited number of rows in memory at a time, controlled by the connection parameter RowsFetchedPerBlock. As the client reads rows, the result set will issue additional fetch requests incrementally.
Implications: The JVM no longer buffers the entire result set upfront, which previously caused a sharp spike in heap memory usage before garbage collection could reclaim space. This spike often led to OutOfMemory (OOM) errors. With this change, peak heap memory usage is reduced, increasing more gradually, which allows the JVM’s memory management to work more effectively.
Following improvements were observed when executing a SQL query on 1 million rows and iterating through the result set without printing the rows.
Existing behaviour. Sudden spike with higher peak:

Gradual heap increase within memory management bounds with less peak:

Testing
Additional Notes to the Reviewer
Note this does not lower the max heap usage by a lot (instead make memory increase gradual). I will make further improvements on how we process the data.