Skip to content

Commit

Permalink
Fix potential deadlock when PageBufferClient exceeds memory limit
Browse files Browse the repository at this point in the history
Prior to this change, if the PageBufferClient caused a task to exceed
its memory limit inside of addPages, the exception would bubble to the
root of the exchange client callback executor without ever marking the
task as failed. If no other operator also saw the memory limit
exceeded condition, then the task would become stuck in a deadlocked
state.
  • Loading branch information
pettyjamesm authored and highker committed Sep 25, 2020
1 parent 35f267d commit 746824c
Showing 1 changed file with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,26 +309,26 @@ public void onSuccess(PagesResponse result)
// This is to fast release the pages on the buffer side.
resultClient.acknowledgeResultsAsync(result.getNextToken());
}

// add pages:
// addPages must be called regardless of whether pages is an empty list because
// clientCallback can keep stats of requests and responses. For example, it may
// keep track of how often a client returns empty response and adjust request
// frequency or buffer size.
if (clientCallback.addPages(PageBufferClient.this, pages)) {
pagesReceived.addAndGet(pages.size());
rowsReceived.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum());
}
else {
pagesRejected.addAndGet(pages.size());
rowsRejected.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum());
}
}
catch (PrestoException e) {
handleFailure(e, resultFuture);
return;
}

// add pages:
// addPages must be called regardless of whether pages is an empty list because
// clientCallback can keep stats of requests and responses. For example, it may
// keep track of how often a client returns empty response and adjust request
// frequency or buffer size.
if (clientCallback.addPages(PageBufferClient.this, pages)) {
pagesReceived.addAndGet(pages.size());
rowsReceived.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum());
}
else {
pagesRejected.addAndGet(pages.size());
rowsRejected.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum());
}

synchronized (PageBufferClient.this) {
// client is complete, acknowledge it by sending it a delete in the next request
if (result.isClientComplete()) {
Expand Down

0 comments on commit 746824c

Please sign in to comment.