-
Notifications
You must be signed in to change notification settings - Fork 27
[PECOBLR-1131] Fix incorrect refetching of expired CloudFetch links when using Thrift protocol. #1066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
In Thrift server the CloudFetch links cannot be fetched by chunk index. Changing code to fetch CloudFetch links from a start row offset.
- Fix chunk index value in chunk creation, do not start from zero. - Fix assertions for the fetched links.
Add integration tests to check refetch of links works correctly.
Fix validation and error handling. Check for exact startRowOffset match.
Add unit tests for thrift result fetch error paths.
Revert getNumRows visibility to protected.
Throw exception when chunk is missing for a chunkIndex.
| .setExternalLink(chunkInfo.getFileLink()) | ||
| .setChunkIndex(chunkIndex) | ||
| .setExpiration(Long.toString(chunkInfo.getExpiryTime())); | ||
| .setExternalLink(chunkInfo.getFileLink()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayantsing-db Setting additional fields in ExternalLink. Is this an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any issue.
| boolean fetchMetadata) | ||
| throws DatabricksHttpException { | ||
| String statementId = StatementId.loggableStatementId(operationHandle); | ||
| verifySuccessStatus(responseStatus, context, statementId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayantsing-db Checking the validity of a previously received response is not the responsibility of this function. Moved it to outside this function at the point where the response is received.
| T chunk = chunkIndexToChunksMap.get(chunkIndex); | ||
| if (chunk == null) { | ||
| // Should never happen. | ||
| throw new IllegalStateException("Chunk not found in map for index " + chunkIndex + ". " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayantsing-db Is this the correct exception to be thrown here?
jayantsing-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments/questions.
| resultSet = response.getDirectResults().getResultSet(); | ||
| resultSet.setResultSetMetadata(response.getDirectResults().getResultSetMetadata()); | ||
| } else { | ||
| verifySuccessStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already verify response at line 215 checkResponseForErrors(response);. That should suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be moved to after pollTillOperationFinished and before the if-else block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verifySuccessStatus is taking response which is already tested at checkResponseForErrors(response). This check can be removed totally?
| resultSet = | ||
| getResultSetResp(response.getStatus(), operationHandle, "getStatementResult", -1, true); | ||
|
|
||
| verifySuccessStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be just after line 404 before we try to access the operation state in response. Previously the check occurred in the function getResultSetResp only as a side-effect of reusing the function. Since we are now more deliberate with this check, it makes more sense to put it after line 404.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| String context, | ||
| int maxRowsPerBlock, | ||
| boolean fetchMetadata) | ||
| private TFetchResultsResp executeRequest(TFetchResultsReq request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this be renamed to executeFetchRequest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE); | ||
| } | ||
|
|
||
| // Subsequent fetches fetch from the next set of rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we do not need offset for subsequent fetches? After the first request, do you expect the server side iterator to reset to the desired offset? I am not sure if the reset actually happens and will check on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The server resets and this feature is tested in the ThriftCloudFetchIntegrationTests. Maybe we should confirm with the server team once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! But I was checking through the code too on this one : But from server code, this does appear to be stateful. The implementation is indeed correct.
- rename executeRequest to executeFetchRequest - moved response status check to after the call
samikshya-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the awesome fix, @tejassp-db! Also congrats on your first PR 🎉
LGTM overall, added some minor comments
| T chunk = chunkIndexToChunksMap.get(chunkIndex); | ||
| if (chunk == null) { | ||
| // Should never happen. | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's throw DatabricksValidationException here - as we push telemetry with these internal exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DatabricksValidationException is linked to INPUT_VALIDATION_ERROR error code. Can you suggest an alternative exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could create a new exception stating DatabricksInvalidStateException but I guess just passing INVALID_STATE to DatabricksException lgtm
| "public Collection<ExternalLink> getResultChunks(statementId = {" | ||
| + statementId | ||
| + "}, chunkIndex = {" | ||
| + chunkIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use string.format across this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| + externalLinks.get(0).getRowOffset() | ||
| + " context=" | ||
| + context; | ||
| throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : can we change this to DatabricksValidationSQLException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DatabricksValidationSQLException has error code INPUT_VALIDATION_ERROR, but this is an unexpected/invalid state. Does any other error code fit this use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INVALID STATE lgtm
| import org.junit.jupiter.api.Test; | ||
|
|
||
| /** Integration test to test CloudFetch link refetching using Thrift client. */ | ||
| public class ThriftCloudFetchIntegrationTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we run these tests on the repo automatically? If not, can you add that too? Or you can add fake service tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to figure this out. I have also added fake service tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, in which case - this can be removed.
| throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE); | ||
| } | ||
|
|
||
| // Subsequent fetches fetch from the next set of rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! But I was checking through the code too on this one : But from server code, this does appear to be stateful. The implementation is indeed correct.
|
Can you also fix the failing tests on the PR? |
| /** | ||
| * Returns the chunk provider for testing purposes. | ||
| * | ||
| * @return the chunk provider wrapped in Optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional comment is not aligned with the return type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| return response; | ||
| } | ||
|
|
||
| private TFetchResultsReq createFetchResultsReqWithDefaults(TOperationHandle operationHandle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for abstracting it in a single method
| AtomicInteger index = new AtomicInteger(0); | ||
| do { | ||
| fetchResultsResp = thriftAccessor.getResultSetResp(getOperationHandle(statementId), context); | ||
| AtomicLong index = new AtomicLong(chunkIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to your PR but why do we have AtomicLong inside a method context? Is not this just overhead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the value is used inside a lambda function, Java allows only immutable/final variables inside lamdas.
- Fix method doc. - Use String.format to format a debug line.
Add fake integration tests to validate cloud fetch links re-fetch from a chunk index and start row offset works.
| while (fetchResultsResp.hasMoreRows) { | ||
| fetchResultsResp = thriftAccessor.getResultSetResp(getOperationHandle(statementId)); | ||
| fetchResultsResp | ||
| .getResults() | ||
| .getResultLinks() | ||
| .forEach( | ||
| resultLink -> | ||
| externalLinks.add(createExternalLink(resultLink, index.getAndIncrement()))); | ||
| } while (fetchResultsResp.hasMoreRows); | ||
| if (chunkIndex < 0 || externalLinks.size() <= chunkIndex) { | ||
| String error = String.format("Out of bounds error for chunkIndex. Context: %s", context); | ||
| LOGGER.error(error); | ||
| throw new DatabricksSQLException(error, DatabricksDriverErrorCode.INVALID_STATE); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tejassp-db , i fixed the link download service to now correctly trigger the link refresh (deadlock issue) on expiry in the case of Thrift. However, this API still has a while loop. The will again take fifteen minutes for a sufficiently large extract and we would be forced to do a refresh again. Instead of fetching all links, can we have an API that just send one fetch request starting from a chunk index like SEA? The link download service will automatically handle calling that API with an appropriate chunk index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this. @jayantsing-db Can you please check if any assumptions break with this change?
Do not fetch all links starting at index in getResultChunks in thrift link fetch. Mimic behaviour of sdk client.
Description
Thrift protocol has a orientation field with values FETCH_NEXT, FETCH_PRIOR or FETCH_FIRST. This field is always set to FETCH_NEXT resulting in incorrect refetch. To fetch from a particular chunk index the Thrift protocol requires the start row offset to be set. The chunk index and start row offset information is available from the expired links. Use the start row offset to fetch the links in the Thrift protocol.
Testing
This fix is tested with an integration test that validates that the correct links are fetched when fetching from a pair of chunk index and start row offset. There are also unit tests to validate correct client behaviour when unexpected responses are received from the server.
Additional Notes to the Reviewer
I also made some changes to the validation of the results. Commented within the PR.