Skip to content

Commit

Permalink
Fix crash when TilePromise is fulfilled after connection is destroyed
Browse files Browse the repository at this point in the history
Summary: There is a heap-use-after-free when the `fn` in `HandlerCallbackBase::fulfillTilePromise` runs because it is possible that the connection has already been destroyed by the time the EventBase executes `fn`. This is because the destruction of the connection is guarded by "inflight requests" which gets decremented after the response is sent to the client from the EventBase. The response gets added to the EventBase through a custom ReplyQueue which doesn't limit max number of items processed in the event loop but the tile fulfillment gets added to the EventBase using the general queue which does limit max number of items processed in the event loop. As a result, the response task gets completed before the tile fulfillment task and allows the connection to be destroyed before the tile is fulfilled. To fix, we can enqueue the tile fulfillment using the same custom queue to ensure that the tile is always fulfilled before the response is sent.

Reviewed By: yfeldblum

Differential Revision: D64521715

fbshipit-source-id: 8dceedae53a9a88f409931601fd2e093dc74b521
  • Loading branch information
Akrama Baig Mirza authored and facebook-github-bot committed Oct 18, 2024
1 parent d43da36 commit a81fe64
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 21 deletions.
29 changes: 10 additions & 19 deletions thrift/lib/cpp2/async/AsyncProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,25 +963,16 @@ bool HandlerCallbackBase::fulfillTilePromise(std::unique_ptr<Tile> ptr) {
return false;
}

auto fn = [connCtx = reqCtx_->getConnectionContext(),
interactionId = reqCtx_->getInteractionId(),
interaction = std::move(interaction_),
ptr = std::move(ptr),
tm = getThreadManager_deprecated(),
eb = eb_,
executor = executor_,
isRPEnabled = isResourcePoolEnabled()]() mutable {
TilePtr tile{ptr.release(), eb};
DCHECK(dynamic_cast<TilePromise*>(interaction.get()));
if (isRPEnabled) {
static_cast<TilePromise&>(*interaction).fulfill(*tile, executor, *eb);
} else {
static_cast<TilePromise&>(*interaction).fulfill(*tile, tm, *eb);
}
connCtx->tryReplaceTile(interactionId, std::move(tile));
};

eb_->runImmediatelyOrRunInEventBaseThread(std::move(fn));
putMessageInReplyQueue(
std::in_place_type_t<TilePromiseReplyInfo>(),
reqCtx_->getConnectionContext(),
reqCtx_->getInteractionId(),
std::move(interaction_),
std::move(ptr),
getThreadManager_deprecated(),
eb_,
executor_,
isResourcePoolEnabled());
return true;
}

Expand Down
49 changes: 47 additions & 2 deletions thrift/lib/cpp2/async/ReplyInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <thrift/lib/cpp2/async/Interaction.h>
#include <thrift/lib/cpp2/async/ResponseChannel.h>
#include <thrift/lib/cpp2/async/Sink.h>
#include <thrift/lib/cpp2/server/Cpp2ConnContext.h>

namespace apache::thrift {

Expand Down Expand Up @@ -86,8 +87,52 @@ class SinkConsumerReplyInfo {
folly::Optional<uint32_t> crc32c_;
};

using ReplyInfo =
std::variant<QueueReplyInfo, StreamReplyInfo, SinkConsumerReplyInfo>;
class TilePromiseReplyInfo {
public:
TilePromiseReplyInfo(
Cpp2ConnContext* connCtx,
int64_t interactionId,
TilePtr interaction,
std::unique_ptr<Tile> ptr,
concurrency::ThreadManager* tm,
folly::EventBase* eb,
folly::Executor::KeepAlive<> executor,
bool isRPEnabled)
: connCtx_(connCtx),
interactionId_(interactionId),
interaction_(std::move(interaction)),
ptr_(std::move(ptr)),
tm_(tm),
eb_(eb),
executor_(std::move(executor)),
isRPEnabled_(isRPEnabled) {}

void operator()() noexcept {
TilePtr tile{ptr_.release(), eb_};
DCHECK(dynamic_cast<TilePromise*>(interaction_.get()));
if (isRPEnabled_) {
static_cast<TilePromise&>(*interaction_).fulfill(*tile, executor_, *eb_);
} else {
static_cast<TilePromise&>(*interaction_).fulfill(*tile, tm_, *eb_);
}
connCtx_->tryReplaceTile(interactionId_, std::move(tile));
}

Cpp2ConnContext* connCtx_;
int64_t interactionId_;
TilePtr interaction_;
std::unique_ptr<Tile> ptr_;
concurrency::ThreadManager* tm_;
folly::EventBase* eb_;
folly::Executor::KeepAlive<> executor_;
bool isRPEnabled_;
};

using ReplyInfo = std::variant<
QueueReplyInfo,
StreamReplyInfo,
SinkConsumerReplyInfo,
TilePromiseReplyInfo>;

/**
* Used in EventBaseAtomicNotificationQueue to process each dequeued item
Expand Down

0 comments on commit a81fe64

Please sign in to comment.