Skip to content

Conversation

@tejassp-db
Copy link
Collaborator

Description

Thrift protocol has a orientation field with values FETCH_NEXT, FETCH_PRIOR or FETCH_FIRST. This field is always set to FETCH_NEXT resulting in incorrect refetch. To fetch from a particular chunk index the Thrift protocol requires the start row offset to be set. The chunk index and start row offset information is available from the expired links. Use the start row offset to fetch the links in the Thrift protocol.

Testing

This fix is tested with an integration test that validates that the correct links are fetched when fetching from a pair of chunk index and start row offset. There are also unit tests to validate correct client behaviour when unexpected responses are received from the server.

Additional Notes to the Reviewer

I also made some changes to the validation of the results. Commented within the PR.

tejassp-db added 9 commits November 4, 2025 15:50
In Thrift server the CloudFetch links cannot be fetched by
chunk index. Changing code to fetch CloudFetch links from
a start row offset.
- Fix chunk index value in chunk creation, do not start from zero.
- Fix assertions for the fetched links.
Add integration tests to check refetch of links works correctly.
Fix validation and error handling. Check for exact startRowOffset
match.
Add unit tests for thrift result fetch error paths.
Revert getNumRows visibility to protected.
Throw exception when chunk is missing for a chunkIndex.
.setExternalLink(chunkInfo.getFileLink())
.setChunkIndex(chunkIndex)
.setExpiration(Long.toString(chunkInfo.getExpiryTime()));
.setExternalLink(chunkInfo.getFileLink())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayantsing-db Setting additional fields in ExternalLink. Is this an issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any issue.

boolean fetchMetadata)
throws DatabricksHttpException {
String statementId = StatementId.loggableStatementId(operationHandle);
verifySuccessStatus(responseStatus, context, statementId);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayantsing-db Checking the validity of a previously received response is not the responsibility of this function. Moved it to outside this function at the point where the response is received.

T chunk = chunkIndexToChunksMap.get(chunkIndex);
if (chunk == null) {
// Should never happen.
throw new IllegalStateException("Chunk not found in map for index " + chunkIndex + ". "
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayantsing-db Is this the correct exception to be thrown here?

Copy link
Collaborator

@jayantsing-db jayantsing-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments/questions.

resultSet = response.getDirectResults().getResultSet();
resultSet.setResultSetMetadata(response.getDirectResults().getResultSetMetadata());
} else {
verifySuccessStatus(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already verify response at line 215 checkResponseForErrors(response);. That should suffice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to after pollTillOperationFinished and before the if-else block?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verifySuccessStatus is taking response which is already tested at checkResponseForErrors(response). This check can be removed totally?

resultSet =
getResultSetResp(response.getStatus(), operationHandle, "getStatementResult", -1, true);

verifySuccessStatus(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be just after line 404 before we try to access the operation state in response. Previously the check occurred in the function getResultSetResp only as a side-effect of reusing the function. Since we are now more deliberate with this check, it makes more sense to put it after line 404.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

String context,
int maxRowsPerBlock,
boolean fetchMetadata)
private TFetchResultsResp executeRequest(TFetchResultsReq request)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be renamed to executeFetchRequest?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE);
}

// Subsequent fetches fetch from the next set of rows.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do not need offset for subsequent fetches? After the first request, do you expect the server side iterator to reset to the desired offset? I am not sure if the reset actually happens and will check on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server resets and this feature is tested in the ThriftCloudFetchIntegrationTests. Maybe we should confirm with the server team once?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tejassp-db and others added 2 commits November 10, 2025 14:12
- rename executeRequest to executeFetchRequest
- moved response status check to after the call
Copy link
Collaborator

@samikshya-db samikshya-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the awesome fix, @tejassp-db! Also congrats on your first PR 🎉

LGTM overall, added some minor comments

T chunk = chunkIndexToChunksMap.get(chunkIndex);
if (chunk == null) {
// Should never happen.
throw new IllegalStateException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's throw DatabricksValidationException here - as we push telemetry with these internal exceptions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabricksValidationException is linked to INPUT_VALIDATION_ERROR error code. Can you suggest an alternative exception.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create a new exception stating DatabricksInvalidStateException but I guess just passing INVALID_STATE to DatabricksException lgtm

"public Collection<ExternalLink> getResultChunks(statementId = {"
+ statementId
+ "}, chunkIndex = {"
+ chunkIndex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use string.format across this file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

+ externalLinks.get(0).getRowOffset()
+ " context="
+ context;
throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : can we change this to DatabricksValidationSQLException

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabricksValidationSQLException has error code INPUT_VALIDATION_ERROR, but this is an unexpected/invalid state. Does any other error code fit this use case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INVALID STATE lgtm

import org.junit.jupiter.api.Test;

/** Integration test to test CloudFetch link refetching using Thrift client. */
public class ThriftCloudFetchIntegrationTests {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we run these tests on the repo automatically? If not, can you add that too? Or you can add fake service tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to figure this out. I have also added fake service tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, in which case - this can be removed.

throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE);
}

// Subsequent fetches fetch from the next set of rows.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samikshya-db
Copy link
Collaborator

Can you also fix the failing tests on the PR?

/**
* Returns the chunk provider for testing purposes.
*
* @return the chunk provider wrapped in Optional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional comment is not aligned with the return type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

return response;
}

private TFetchResultsReq createFetchResultsReqWithDefaults(TOperationHandle operationHandle) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for abstracting it in a single method

AtomicInteger index = new AtomicInteger(0);
do {
fetchResultsResp = thriftAccessor.getResultSetResp(getOperationHandle(statementId), context);
AtomicLong index = new AtomicLong(chunkIndex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your PR but why do we have AtomicLong inside a method context? Is not this just overhead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the value is used inside a lambda function, Java allows only immutable/final variables inside lamdas.

tejassp-db and others added 3 commits November 25, 2025 10:21
- Fix method doc.
- Use String.format to format a debug line.
Add fake integration tests to validate cloud fetch links
re-fetch from a chunk index and start row offset works.
Comment on lines 351 to 359
while (fetchResultsResp.hasMoreRows) {
fetchResultsResp = thriftAccessor.getResultSetResp(getOperationHandle(statementId));
fetchResultsResp
.getResults()
.getResultLinks()
.forEach(
resultLink ->
externalLinks.add(createExternalLink(resultLink, index.getAndIncrement())));
} while (fetchResultsResp.hasMoreRows);
if (chunkIndex < 0 || externalLinks.size() <= chunkIndex) {
String error = String.format("Out of bounds error for chunkIndex. Context: %s", context);
LOGGER.error(error);
throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tejassp-db , i fixed the link download service to now correctly trigger the link refresh (deadlock issue) on expiry in the case of Thrift. However, this API still has a while loop. The will again take fifteen minutes for a sufficiently large extract and we would be forced to do a refresh again. Instead of fetching all links, can we have an API that just send one fetch request starting from a chunk index like SEA? The link download service will automatically handle calling that API with an appropriate chunk index.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this. @jayantsing-db Can you please check if any assumptions break with this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants