Skip to content

Commit

Permalink
[native] Refactor PrestoExchangeSource to have separate response hand…
Browse files Browse the repository at this point in the history
…le methods
tanjialiang committed Apr 1, 2024
1 parent 993945b commit c5bcd37
Showing 2 changed files with 79 additions and 52 deletions.
117 changes: 65 additions & 52 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
@@ -115,6 +115,12 @@ PrestoExchangeSource::PrestoExchangeSource(
});
}

void PrestoExchangeSource::close() {
closed_.store(true);
checkSetRequestPromise();
abortResults();
}

bool PrestoExchangeSource::shouldRequestLocked() {
if (atEnd_) {
return false;
@@ -391,40 +397,39 @@ void PrestoExchangeSource::processDataError(
}
}

bool PrestoExchangeSource::checkSetRequestPromise() {
VeloxPromise<Response> promise;
{
std::lock_guard<std::mutex> l(queue_->mutex());
promise = std::move(promise_);
}
if (promise.valid() && !promise.isFulfilled()) {
promise.setValue(Response{0, false});
return true;
}

return false;
}

void PrestoExchangeSource::acknowledgeResults(int64_t ackSequence) {
auto ackPath = fmt::format("{}/{}/acknowledge", basePath_, ackSequence);
VLOG(1) << "Sending ack " << ackPath;
auto self = getSelfPtr();

http::RequestBuilder()
.method(proxygen::HTTPMethod::GET)
.url(ackPath)
.send(httpClient_.get())
.via(driverExecutor_)
.thenValue([self](std::unique_ptr<http::HttpResponse> response) {
VLOG(1) << "Ack " << response->headers()->getStatusCode();
})
.thenError(
folly::tag_t<std::exception>{}, [self](const std::exception& e) {
// Acks are optional. No need to fail the query.
VLOG(1) << "Ack failed: " << e.what();
.thenTry(
[this, self = getSelfPtr()](
folly::Try<std::unique_ptr<http::HttpResponse>> responseTry) {
// self needs to be held for keeping 'this' source alive during
// processing
handleAckResponse(std::move(responseTry));
});
}

void PrestoExchangeSource::handleAckResponse(
folly::Try<std::unique_ptr<http::HttpResponse>> responseTry) {
if (!responseTry.hasException()) {
try {
auto& response = responseTry.value();
VLOG(1) << "Ack " << response->headers()->getStatusCode();
} catch (const std::exception& e) {
// Acks are optional. No need to fail the query.
VLOG(1) << "Ack failed: " << e.what();
}
} else {
// Acks are optional. No need to fail the query.
VLOG(1) << "Ack failed: " << responseTry.exception().what();
}
}

void PrestoExchangeSource::abortResults() {
if (abortResultsIssued_.exchange(true)) {
return;
@@ -439,44 +444,52 @@ void PrestoExchangeSource::abortResults() {
}

void PrestoExchangeSource::doAbortResults(int64_t delayMs) {
auto queue = queue_;
auto self = getSelfPtr();
http::RequestBuilder()
.method(proxygen::HTTPMethod::DELETE)
.url(basePath_)
.send(httpClient_.get(), "", delayMs)
.via(driverExecutor_)
.thenTry([queue, self](
.thenTry([this, self = getSelfPtr()](
folly::Try<std::unique_ptr<http::HttpResponse>> response) {
std::optional<std::string> error;
if (response.hasException()) {
error = response.exception().what();
} else {
auto statusCode = response.value()->headers()->getStatusCode();
if (statusCode != http::kHttpOk &&
statusCode != http::kHttpNoContent) {
error = std::to_string(statusCode);
}
}
if (!error.has_value()) {
return;
}
if (self->abortRetryState_.isExhausted()) {
const std::string errMsg = fmt::format(
"Abort results failed: {}, path {}",
error.value(),
self->basePath_);
LOG(ERROR) << errMsg;
return onFinalFailure(errMsg, queue);
}
self->doAbortResults(self->abortRetryState_.nextDelayMs());
handleAbortResponse(std::move(response));
});
}

void PrestoExchangeSource::close() {
closed_.store(true);
checkSetRequestPromise();
abortResults();
void PrestoExchangeSource::handleAbortResponse(
folly::Try<std::unique_ptr<http::HttpResponse>> responseTry) {
std::optional<std::string> error;
if (responseTry.hasException()) {
error = responseTry.exception().what();
} else {
auto statusCode = responseTry.value()->headers()->getStatusCode();
if (statusCode != http::kHttpOk && statusCode != http::kHttpNoContent) {
error = std::to_string(statusCode);
}
}
if (!error.has_value()) {
return;
}
if (abortRetryState_.isExhausted()) {
const std::string errMsg = fmt::format(
"Abort results failed: {}, path {}", error.value(), basePath_);
LOG(ERROR) << errMsg;
return onFinalFailure(errMsg, queue_);
}
doAbortResults(abortRetryState_.nextDelayMs());
}

bool PrestoExchangeSource::checkSetRequestPromise() {
VeloxPromise<Response> promise;
{
std::lock_guard<std::mutex> l(queue_->mutex());
promise = std::move(promise_);
}
if (promise.valid() && !promise.isFulfilled()) {
promise.setValue(Response{0, false});
return true;
}

return false;
}

std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::getSelfPtr() {
14 changes: 14 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
@@ -226,12 +226,26 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {

void acknowledgeResults(int64_t ackSequence);

// Handles returned http response from acknowledge result request.
//
// NOTE: This method is normally called within callbacks. Caller should make
// sure 'this' lives during the entire duration of this method call.
void handleAckResponse(
folly::Try<std::unique_ptr<http::HttpResponse>> responseTry);

void abortResults();

/// Send abort results after specified delay. This function is called
/// multiple times by abortResults for retries.
void doAbortResults(int64_t delayMs);

// Handles returned http response from abort result request.
//
// NOTE: This method is normally called within callbacks. Caller should make
// sure 'this' lives during the entire duration of this method call.
void handleAbortResponse(
folly::Try<std::unique_ptr<http::HttpResponse>> responseTry);

/// Completes the future returned from 'request()' if it hasn't completed
/// already.
bool checkSetRequestPromise();

0 comments on commit c5bcd37

Please sign in to comment.