Skip to content

Commit

Permalink
move spoolAsyncLog to common utils for reuse
Browse files Browse the repository at this point in the history
Summary: as title

Reviewed By: stuclar

Differential Revision: D48845981

fbshipit-source-id: a6b62f12c7ee9d162b6d63066d5a47ad957ca5cc
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Sep 1, 2023
1 parent 7f54c55 commit 30ca009
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 51 deletions.
4 changes: 2 additions & 2 deletions mcrouter/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ libmcroutercore_a_SOURCES = \
McrouterLogger.h \
McrouterManager.cpp\
McrouterManager.h\
McInvalidationUtils.h \
McInvalidationUtils.cpp \
McSpoolUtils.h \
McSpoolUtils.cpp \
lib/invalidation/McInvalidationKvPairs.h \
lib/invalidation/McInvalidationKvPairs.cpp \
lib/invalidation/McInvalidationDefs.h \
Expand Down
51 changes: 50 additions & 1 deletion mcrouter/McInvalidationUtils.cpp → mcrouter/McSpoolUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* LICENSE file in the root directory of this source tree.
*/

#include "mcrouter/McInvalidationUtils.h"
#include "mcrouter/McSpoolUtils.h"

namespace facebook {
namespace memcache {
Expand Down Expand Up @@ -66,6 +66,55 @@ FOLLY_NOINLINE bool spoolAxonProxy(
return axonCtx->writeProxyFn(bucketId, std::move(kvPairs));
}

FOLLY_NOINLINE bool spoolAsynclog(
ProxyBase* proxy,
const memcache::McDeleteRequest& req,
const std::shared_ptr<const AccessPoint>& host,
bool keepRoutingPrefix,
folly::StringPiece asynclogName) {
if (asynclogName.empty()) {
return false;
}
folly::StringPiece key = keepRoutingPrefix ? req.key_ref()->fullKey()
: req.key_ref()->keyWithoutRoute();
folly::fibers::Baton b;
auto res = false;
auto attr = *req.attributes_ref();
const auto asyncWriteStartUs = nowUs();
auto asyncWriter = proxy->router().asyncWriter();
if (asyncWriter && host) {
res = asyncWriter->run([&b, &attr, &host, proxy, key, asynclogName]() {
if (proxy->asyncLog().writeDelete(*host, key, asynclogName, attr)) {
proxy->stats().increment(asynclog_spool_success_rate_stat);
}
b.post();
});
}

if (!host) {
MC_LOG_FAILURE(
proxy->router().opts(),
memcache::failure::Category::kBrokenLogic,
"Failed to enqueue asynclog request (key {}, pool {}) due to missing host info",
key,
asynclogName);
} else if (!res) {
MC_LOG_FAILURE(
proxy->router().opts(),
memcache::failure::Category::kOutOfResources,
"Could not enqueue asynclog request (key {}, pool {})",
key,
asynclogName);
} else {
// Don't reply to the user until we safely logged the request to disk
b.wait();
const auto asyncWriteDurationUs = nowUs() - asyncWriteStartUs;
proxy->stats().asyncLogDurationUs().insertSample(asyncWriteDurationUs);
proxy->stats().increment(asynclog_requests_rate_stat);
}
return true;
}

} // namespace mcrouter
} // namespace memcache
} // namespace facebook
9 changes: 7 additions & 2 deletions mcrouter/McInvalidationUtils.h → mcrouter/McSpoolUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <folly/Conv.h>
#include <folly/Range.h>

#include "mcrouter/ProxyBase.h"

#include "mcrouter/AsyncWriter.h"
#include "mcrouter/McrouterFiberContext.h"
#include "mcrouter/McrouterLogFailure.h"
Expand Down Expand Up @@ -47,6 +45,13 @@ FOLLY_NOINLINE bool spoolAxonProxy(
const std::shared_ptr<AxonContext>& axonCtx,
uint64_t bucketId);

FOLLY_NOINLINE bool spoolAsynclog(
ProxyBase* proxy,
const memcache::McDeleteRequest& req,
const std::shared_ptr<const AccessPoint>& host,
bool keepRoutingPrefix,
folly::StringPiece asynclogName);

} // namespace mcrouter
} // namespace memcache
} // namespace facebook
54 changes: 8 additions & 46 deletions mcrouter/routes/DestinationRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "mcrouter/AsyncLog.h"
#include "mcrouter/AsyncWriter.h"
#include "mcrouter/CarbonRouterInstanceBase.h"
#include "mcrouter/McInvalidationUtils.h"
#include "mcrouter/McReqUtil.h"
#include "mcrouter/McSpoolUtils.h"
#include "mcrouter/McrouterFiberContext.h"
#include "mcrouter/McrouterLogFailure.h"
#include "mcrouter/ProxyDestination.h"
Expand Down Expand Up @@ -377,51 +377,8 @@ class DestinationRoute {
return bucketId;
}

template <class Request>
FOLLY_NOINLINE bool spoolAsynclog(const Request& req) const {
auto asynclogName = fiber_local<RouterInfo>::getAsynclogName();
if (asynclogName.empty()) {
return false;
}

folly::StringPiece key = keepRoutingPrefix_
? req.key_ref()->fullKey()
: req.key_ref()->keyWithoutRoute();

auto proxy = &fiber_local<RouterInfo>::getSharedCtx()->proxy();
auto& ap = *destination_->accessPoint();
folly::fibers::Baton b;
auto res = false;
auto attr = *req.attributes_ref();
const auto asyncWriteStartUs = nowUs();
if (auto asyncWriter = proxy->router().asyncWriter()) {
res = asyncWriter->run([&b, &ap, &attr, proxy, key, asynclogName]() {
if (proxy->asyncLog().writeDelete(ap, key, asynclogName, attr)) {
proxy->stats().increment(asynclog_spool_success_rate_stat);
}
b.post();
});
}
if (!res) {
MC_LOG_FAILURE(
proxy->router().opts(),
memcache::failure::Category::kOutOfResources,
"Could not enqueue asynclog request (key {}, pool {})",
key,
asynclogName);
} else {
// Don't reply to the user until we safely logged the request to disk
b.wait();
const auto asyncWriteDurationUs = nowUs() - asyncWriteStartUs;
proxy->stats().asyncLogDurationUs().insertSample(asyncWriteDurationUs);
proxy->stats().increment(asynclog_requests_rate_stat);
}
return true;
}

template <class Request>
FOLLY_NOINLINE bool spool(
const Request& req,
const McDeleteRequest& req,
const std::shared_ptr<AxonContext>& axonCtx,
const std::optional<uint64_t>& bucketId) const {
// return true if axonlog is enabled and appending to axon client succeed.
Expand All @@ -438,7 +395,12 @@ class DestinationRoute {
bool asyncLogRes = false;
if (!axonCtx || !axonCtx->fallbackAsynclog || !axonLogRes) {
// return true if asyclog is enabled
asyncLogRes = spoolAsynclog(req);
asyncLogRes = spoolAsynclog(
&fiber_local<RouterInfo>::getSharedCtx()->proxy(),
req,
destination_->accessPoint(),
keepRoutingPrefix_,
fiber_local<RouterInfo>::getAsynclogName());
}
return asyncLogRes || axonLogRes;
}
Expand Down

0 comments on commit 30ca009

Please sign in to comment.