Skip to content

Commit

Permalink
[#11628] YSQL: Implementing Async Flush
Browse files Browse the repository at this point in the history
Summary:
Currently, as part of any statement, YSQL does some processing and buffers
writes. The write buffer is flushed once either of the below conditions is hit -

(1) the write buffer is full (i.e., hits ysql_session_max_batch_size limit)
(2) a read op is required

On a flush, YSQL directs the writes to required tablet servers in different rpcs
(all issued in parallel). Only once responses to all RPCs are received, the YSQL
backend makes further progress. This waiting behaviour affects performance of
bulk loading using COPY FROM because YSQL spends a lot of time waiting for
responses. It would be ideal to use that wait time for reading further tuples from
the input source and perform necessary processing.

In this diff, we are adding some asynchrony to the flush to allow the YSQL's
COPY FROM to read more tuples after sending a set of rpcs to tablet servers
(without waiting for the responses).

This is done by storing the flush future and not waiting for its result
immediately. Only when YSQL refills its write buffer, it will wait for the
earlier flush's result just before performing the next flush call. Note that the
right choice of ysql_session_max_batch_size is required to help us mask almost
all of the wait time. The optimal batch size is one in which both of the
following tasks (which will run simultaneously after this diff) take almost the
same time -

(1) YSQL fetching and buffering ysql_session_max_batch_size rows
(2) Sending rpcs for the previous ysql_session_max_batch_size rows and arrival
of responses from the tserver

Note also that there might not be any value of ysql_session_max_batch_size for
which both tasks complete at roughly the same time. This could be due to the
inherently different speeds of disk reading and tablet servers' performance.

Test Plan:
Built locally and tested by creating indexes and performing COPY FROM. Previous experiments on portal clusters show that there is generally a 30% increase in speed when using async flush versus using regular flushing.

Also Jenkins tests since this is a general enhancement that is used everywhere.

Reviewers: dmitry, pjain

Reviewed By: dmitry, pjain

Subscribers: jason, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D16005
  • Loading branch information
nathanhjli committed Apr 11, 2022
1 parent 4f5e5c1 commit ad4a991
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 30 deletions.
21 changes: 21 additions & 0 deletions src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3332,6 +3332,27 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},

{
{"ysql_session_max_batch_size", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the maximum batch size for writes that YSQL can buffer before flushing to tablet servers."),
gettext_noop("If this is 0, YSQL will use the gflag ysql_session_max_batch_size. If non-zero, this session variable will supersede the value of the gflag."),
0
},
&ysql_session_max_batch_size,
0, 0, INT_MAX,
NULL, NULL, NULL
},

{
{"ysql_max_in_flight_ops", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Maximum number of in-flight operations allowed from YSQL to tablet servers"),
NULL,
},
&ysql_max_in_flight_ops,
10000, 1, INT_MAX,
NULL, NULL, NULL
},

{
{"yb_follower_read_staleness_ms", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the staleness (in ms) to be used for performing follower reads."),
Expand Down
6 changes: 6 additions & 0 deletions src/yb/common/ybc_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ bool suppress_nonpg_logs = false;

bool yb_binary_restore = false;

// If this is set in the user's session to a positive value, it will supersede the gflag
// ysql_session_max_batch_size.
int ysql_session_max_batch_size = 0;

int ysql_max_in_flight_ops = 0;

namespace yb {

namespace {
Expand Down
10 changes: 10 additions & 0 deletions src/yb/common/ybc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ extern bool yb_force_global_transaction;
*/
extern bool suppress_nonpg_logs;

/*
* Guc variable to control the max session batch size before flushing.
*/
extern int ysql_session_max_batch_size;

/*
* Guc variable to control the max number of in-flight operations from YSQL to tablet server.
*/
extern int ysql_max_in_flight_ops;

/*
* Guc variable to enable binary restore from a binary backup of YSQL tables. When doing binary
* restore, we copy the docdb SST files of those tables from the source database and reuse them
Expand Down
149 changes: 124 additions & 25 deletions src/yb/yql/pggate/pg_operation_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <utility>
#include <vector>

#include <boost/circular_buffer.hpp>

#include "yb/common/constants.h"
#include "yb/common/pgsql_protocol.pb.h"
#include "yb/common/ql_expr.h"
Expand All @@ -37,7 +39,6 @@

#include "yb/yql/pggate/pg_op.h"
#include "yb/yql/pggate/pg_tabledesc.h"
#include "yb/yql/pggate/pggate_flags.h"

namespace yb {
namespace pggate {
Expand Down Expand Up @@ -141,6 +142,32 @@ inline bool IsTableUsedByRequest(const LWPgsqlReadRequestPB& request, const Slic
(request.has_index_request() && IsTableUsedByRequest(request.index_request(), table_id));
}

using RowKeys = std::unordered_set<RowIdentifier, boost::hash<RowIdentifier>>;

struct InFlightOperation {
RowKeys keys;
PerformFuture future;

explicit InFlightOperation(PerformFuture future_) : future(std::move(future_)) {
}
};

using InFlightOps = boost::circular_buffer_space_optimized<InFlightOperation,
std::allocator<InFlightOperation>>;

void EnsureCapacity(InFlightOps* in_flight_ops, BufferingSettings buffering_settings) {
size_t capacity = in_flight_ops->capacity();
size_t num_buffers_needed =
(buffering_settings.max_in_flight_operations / buffering_settings.max_batch_size) + 1;
// Change the capacity of the buffer if needed. This will only be different when
// buffering_settings_ is changed in StartOperationsBuffering(), or right after construction
// of the buffer. As such, we don't have to worry about set_capacity() dropping any
// InFlightOperations, since there are guaranteed to be none.
if (capacity < num_buffers_needed) {
in_flight_ops->set_capacity(num_buffers_needed);
}
}

} // namespace

void BufferableOperations::Add(PgsqlOpPtr op, const PgObjectId& relation) {
Expand Down Expand Up @@ -173,8 +200,9 @@ size_t BufferableOperations::size() const {

class PgOperationBuffer::Impl {
public:
explicit Impl(const Flusher& flusher)
: flusher_(flusher) {
Impl(const Flusher& flusher, const BufferingSettings& buffering_settings)
: flusher_(flusher),
buffering_settings_(buffering_settings) {
}

CHECKED_STATUS Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional) {
Expand All @@ -187,19 +215,28 @@ class PgOperationBuffer::Impl {
if (PREDICT_FALSE(!keys_.insert(row_id).second)) {
RETURN_NOT_OK(Flush());
keys_.insert(row_id);
} else {
// Prevent conflicts on in-flight operations which use current row_id.
for (auto i = in_flight_ops_.begin(); i != in_flight_ops_.end(); ++i) {
if (i->keys.find(row_id) != i->keys.end()) {
RETURN_NOT_OK(EnsureCompleted(++i));
break;
}
}
}
auto& target = (transactional ? txn_ops_ : ops_);
if (target.empty()) {
target.Reserve(FLAGS_ysql_session_max_batch_size);
target.Reserve(buffering_settings_.max_batch_size);
}
target.Add(std::move(op), table.id());
return keys_.size() >= FLAGS_ysql_session_max_batch_size ? Flush() : Status::OK();
return keys_.size() >= buffering_settings_.max_batch_size
? SendBuffer()
: Status::OK();
}

CHECKED_STATUS Flush() {
return DoFlush(make_lw_function([this](BufferableOperations ops, bool txn) {
return ResultToStatus(VERIFY_RESULT(flusher_(std::move(ops), txn)).Get());
}));
RETURN_NOT_OK(SendBuffer());
return EnsureCompleted(in_flight_ops_.end());
}

Result<BufferableOperations> FlushTake(
Expand All @@ -208,48 +245,107 @@ class PgOperationBuffer::Impl {
if (IsFullFlushRequired(table, op)) {
RETURN_NOT_OK(Flush());
} else {
RETURN_NOT_OK(DoFlush(make_lw_function(
[this, transactional, &result](BufferableOperations ops, bool txn) -> Status {
RETURN_NOT_OK(SendBuffer(make_lw_function(
[transactional, &result](BufferableOperations ops, bool txn) {
if (txn == transactional) {
ops.Swap(&result);
return Status::OK();
return true;
}
return ResultToStatus(VERIFY_RESULT(flusher_(std::move(ops), txn)).Get());
return false;
})));
RETURN_NOT_OK(EnsureCompleted(in_flight_ops_.end()));
}
return result;
}

size_t Size() const {
return keys_.size();
return keys_.size() + InFlightOpsCount();
}

void Clear() {
VLOG_IF(1, !keys_.empty()) << "Dropping " << keys_.size() << " pending operations";
ops_.Clear();
txn_ops_.Clear();
keys_.clear();
in_flight_ops_.clear();
}

private:
using SyncFlusher = LWFunction<Status(BufferableOperations, bool)>;
size_t InFlightOpsCount() const {
size_t ops_count = 0;
for (const auto& op : in_flight_ops_) {
ops_count += op.keys.size();
}
return ops_count;
}

CHECKED_STATUS EnsureCompleted(const InFlightOps::iterator& end) {
// Extract the InFlightOperations we are processing and erase them from in_flight_ops_.
std::vector<InFlightOperation> extracted_ops;
extracted_ops.reserve(end - in_flight_ops_.begin());
for (auto i = in_flight_ops_.begin(); i != end; ++i) {
extracted_ops.push_back(std::move(*i));
}
in_flight_ops_.erase(in_flight_ops_.begin(), end);
for (auto i = extracted_ops.begin(); i != extracted_ops.end(); ++i) {
RETURN_NOT_OK(i->future.Get());
}
return Status::OK();
}

using SendInterceptor = LWFunction<bool(BufferableOperations, bool)>;

CHECKED_STATUS SendBuffer() {
return SendBufferImpl(nullptr /* interceptor */);
}

CHECKED_STATUS SendBuffer(const SendInterceptor& interceptor) {
return SendBufferImpl(&interceptor);
}

CHECKED_STATUS DoFlush(const SyncFlusher& flusher) {
CHECKED_STATUS SendBufferImpl(const SendInterceptor* interceptor) {
if (keys_.empty()) {
return Status::OK();
}
BufferableOperations ops;
BufferableOperations txn_ops;
RowKeys keys;
ops_.Swap(&ops);
txn_ops_.Swap(&txn_ops);
keys_.clear();

if (!ops.empty()) {
RETURN_NOT_OK(flusher(std::move(ops), false /* transactional */));
}
if (!txn_ops.empty()) {
RETURN_NOT_OK(flusher(std::move(txn_ops), true /* transactional */));
keys_.swap(keys);

const auto ops_count = keys.size();
bool ops_sent = VERIFY_RESULT(SendOperations(
interceptor, std::move(txn_ops), true /* transactional */, ops_count));
ops_sent = VERIFY_RESULT(SendOperations(
interceptor, std::move(ops),
false /* transactional */, ops_sent ? 0 : ops_count)) || ops_sent;
if (ops_sent) {
in_flight_ops_.back().keys = std::move(keys);
}
return Status::OK();
}

Result<bool> SendOperations(const SendInterceptor* interceptor,
BufferableOperations ops,
bool transactional,
size_t ops_count) {
if (!ops.empty() && !(interceptor && (*interceptor)(std::move(ops), transactional))) {
EnsureCapacity(&in_flight_ops_, buffering_settings_);
size_t kMaxInFlightOperations = buffering_settings_.max_in_flight_operations;
for (int64_t space_required = (InFlightOpsCount() + ops_count) - kMaxInFlightOperations;
space_required > 0;) {
auto it = in_flight_ops_.begin();
space_required -= it->keys.size();
RETURN_NOT_OK(EnsureCompleted(++it));
}
in_flight_ops_.push_back(
InFlightOperation(VERIFY_RESULT(flusher_(std::move(ops), transactional))));
return true;
}
return false;
}

bool IsFullFlushRequired(const PgTableDesc& table, const PgsqlOp& op) const {
return op.is_read()
? IsSameTableUsedByBufferedOperations(down_cast<const PgsqlReadOp&>(op).read_request())
Expand All @@ -268,13 +364,16 @@ class PgOperationBuffer::Impl {
}

const Flusher flusher_;
const BufferingSettings& buffering_settings_;
BufferableOperations ops_;
BufferableOperations txn_ops_;
std::unordered_set<RowIdentifier, boost::hash<RowIdentifier>> keys_;
RowKeys keys_;
InFlightOps in_flight_ops_;
};

PgOperationBuffer::PgOperationBuffer(const Flusher& flusher)
: impl_(new Impl(flusher)) {
PgOperationBuffer::PgOperationBuffer(const Flusher& flusher,
const BufferingSettings& buffering_settings)
: impl_(new Impl(flusher, buffering_settings)) {
}

PgOperationBuffer::~PgOperationBuffer() = default;
Expand Down
7 changes: 6 additions & 1 deletion src/yb/yql/pggate/pg_operation_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
namespace yb {
namespace pggate {

struct BufferingSettings {
size_t max_batch_size;
size_t max_in_flight_operations;
};

struct BufferableOperations {
PgsqlOps operations;
PgObjectIds relations;
Expand All @@ -45,7 +50,7 @@ class PgOperationBuffer {
public:
using Flusher = std::function<Result<PerformFuture>(BufferableOperations, bool)>;

explicit PgOperationBuffer(const Flusher& flusher);
PgOperationBuffer(const Flusher& flusher, const BufferingSettings& buffering_settings);
~PgOperationBuffer();
CHECKED_STATUS Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional);
CHECKED_STATUS Flush();
Expand Down
19 changes: 15 additions & 4 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ bool Empty(const PgOperationBuffer& buffer) {
return !buffer.Size();
}

void Update(BufferingSettings* buffering_settings) {
/* Use the gflag value if the session variable is unset for batch size. */
buffering_settings->max_batch_size = ysql_session_max_batch_size <= 0
? FLAGS_ysql_session_max_batch_size
: static_cast<uint64_t>(ysql_session_max_batch_size);
buffering_settings->max_in_flight_operations = static_cast<uint64_t>(ysql_max_in_flight_ops);
}

} // namespace

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -294,9 +302,11 @@ PgSession::PgSession(
pg_txn_manager_(std::move(pg_txn_manager)),
clock_(std::move(clock)),
buffer_(std::bind(
&PgSession::FlushOperations, this, std::placeholders::_1, std::placeholders::_2)),
&PgSession::FlushOperations, this, std::placeholders::_1, std::placeholders::_2),
buffering_settings_),
tserver_shared_object_(tserver_shared_object),
pg_callbacks_(pg_callbacks) {
Update(&buffering_settings_);
}

PgSession::~PgSession() = default;
Expand Down Expand Up @@ -453,6 +463,7 @@ Status PgSession::StartOperationsBuffering() {
<< buffer_.Size()
<< " buffered operations found";
}
Update(&buffering_settings_);
buffering_enabled_ = true;
return Status::OK();
}
Expand Down Expand Up @@ -568,16 +579,16 @@ Result<bool> PgSession::ForeignKeyReferenceExists(PgOid table_id,

std::vector<TableYbctid> ybctids;
ybctids.reserve(std::min<size_t>(
fk_reference_intent_.size(), FLAGS_ysql_session_max_batch_size));
fk_reference_intent_.size(), buffering_settings_.max_batch_size));

// If the reader fails to get the result, we fail the whole operation (and transaction).
// Hence it's ok to extract (erase) the keys from intent before calling reader.
auto node = fk_reference_intent_.extract(it);
ybctids.push_back({table_id, std::move(node.value().ybctid)});

// Read up to FLAGS_ysql_session_max_batch_size keys.
// Read up to session max batch size keys.
for (auto it = fk_reference_intent_.begin();
it != fk_reference_intent_.end() && ybctids.size() < FLAGS_ysql_session_max_batch_size; ) {
it != fk_reference_intent_.end() && ybctids.size() < buffering_settings_.max_batch_size; ) {
node = fk_reference_intent_.extract(it++);
auto& key_ref = node.value();
ybctids.push_back({key_ref.table_id, std::move(key_ref.ybctid)});
Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class PgSession : public RefCountedThreadSafe<PgSession> {

// Should write operations be buffered?
bool buffering_enabled_ = false;
BufferingSettings buffering_settings_;
PgOperationBuffer buffer_;

HybridTime in_txn_limit_;
Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/pggate/pggate_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ DEFINE_double(ysql_backward_prefetch_scale_factor, 0.0625 /* 1/16th */,
"Scale factor to reduce ysql_prefetch_limit for backward scan");

DEFINE_uint64(ysql_session_max_batch_size, 512,
"Use session variable ysql_session_max_batch_size instead. "
"Maximum batch size for buffered writes between PostgreSQL server and YugaByte DocDB "
"services");

Expand Down

0 comments on commit ad4a991

Please sign in to comment.