Skip to content

Commit

Permalink
[yugabyte#11628, yugabyte#11732] Implementing improved async flush fo…
Browse files Browse the repository at this point in the history
…r pg_session, indexes supported

Summary: Working on flakiness still (Aborted: backfill connection to DB failed), but getting this diff out for initial reviews and opinions. One thing to note is that the initial pipeline where we passed async flush seems to be less flakier since we can control exactly when we want to use async flush.

Test Plan:
Built locally and tested by creating indexes and performing COPY FROM.

Also added java test:
./yb_build.sh --java-test org.yb.pgsql.TestAsyncFlush

Reviewers: pjain, dmitry

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D16005
  • Loading branch information
nathanhjli committed Mar 15, 2022
1 parent f05c0c6 commit 447199b
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 38 deletions.
60 changes: 31 additions & 29 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,18 @@ public void testCopyWithAsyncFlush() throws Exception {
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int PRIMARY KEY, b int, c int)",
tableName));
statement.execute(String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)",
tableName, absFilePath));
statement.execute(String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)",
tableName, absFilePath));

// Verify row count.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines);

// Verify specific rows are present.
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=0", 0, 0, 0);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=50000", 50000, 50000, 50000);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=99999", 99999, 99999, 99999);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=50000",
50000, 50000, 50000);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=99999",
99999, 99999, 99999);
}
}

Expand All @@ -72,29 +73,30 @@ public void testIndexCopyWithAsyncFlush() throws Exception {
createDataFile(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int PRIMARY KEY, b int, c int)",
tableName));
statement.execute(String.format("CREATE INDEX index_1 ON %s(b) WHERE b < 30", tableName));
statement.execute(String.format("CREATE INDEX index_2 ON %s(b,c)", tableName));
statement.execute(String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)",
tableName, absFilePath));

// Verify row count.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines);

// Verify specific rows are present.
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=0", 0, 0, 0);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=50000", 50000, 50000, 50000);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=99999", 99999, 99999, 99999);

// Verify index_1.
assertOneRow(statement, String.format(
"WITH w AS (SELECT * FROM %s WHERE b < 30) SELECT COUNT(*) FROM w;", tableName), 30);

// Verify index_2.
assertOneRow(statement, String.format(
"WITH w AS (SELECT * FROM %s) SELECT COUNT(*) FROM w;", tableName), totalLines);
statement.execute(String.format("CREATE TABLE %s (a int PRIMARY KEY, b int, c int)",
tableName));
statement.execute(String.format("CREATE INDEX index_1 ON %s(b) WHERE b < 30", tableName));
statement.execute(String.format("CREATE INDEX index_2 ON %s(b,c)", tableName));
statement.execute(String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)",
tableName, absFilePath));

// Verify row count.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines);

// Verify specific rows are present.
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=0", 0, 0, 0);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=50000",
50000, 50000, 50000);
assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=99999",
99999, 99999, 99999);

// Verify index_1.
assertOneRow(statement, String.format(
"WITH w AS (SELECT * FROM %s WHERE b < 30) SELECT COUNT(*) FROM w;", tableName), 30);

// Verify index_2.
assertOneRow(statement, String.format(
"WITH w AS (SELECT * FROM %s) SELECT COUNT(*) FROM w;", tableName), totalLines);
}
}
}
}
11 changes: 11 additions & 0 deletions src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3332,6 +3332,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},

{
{"ysql_session_max_batch_size", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the maximum batch size for writes that YSQL can buffer before flushing to tablet servers."),
gettext_noop("If this is 0, YSQL will use the gflag ysql_session_max_batch_size. If non-zero, this session variable will supersede the value of the gflag."),
0
},
&ysql_session_max_batch_size,
0, 0, INT_MAX,
NULL, NULL, NULL
},

{
{"yb_follower_read_staleness_ms", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the staleness (in ms) to be used for performing follower reads."),
Expand Down
5 changes: 5 additions & 0 deletions src/yb/common/ybc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ extern bool yb_force_global_transaction;
*/
extern bool suppress_nonpg_logs;

/*
* Guc variable to control the max session batch size before flushing.
*/
extern int ysql_session_max_batch_size;

/*
* Guc variable to enable binary restore from a binary backup of YSQL tables. When doing binary
* restore, we copy the docdb SST files of those tables from the source database and reuse them
Expand Down
23 changes: 17 additions & 6 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ DECLARE_int32(TEST_user_ddl_operation_timeout_sec);

DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests.");

// If this is set in the user's session to a positive value, it will supersede the gflag
// ysql_session_max_batch_size.
int ysql_session_max_batch_size = 0;

namespace yb {
namespace pggate {

Expand Down Expand Up @@ -148,6 +152,13 @@ bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) {
}
}

/* Use the gflag value if the session variable is unset. */
uint64_t GetSessionMaxBatchSize() {
return ysql_session_max_batch_size <= 0
? FLAGS_ysql_session_max_batch_size
: (uint64_t) ysql_session_max_batch_size;
}

struct PgForeignKeyReferenceLightweight {
PgOid table_id;
Slice ybctid;
Expand Down Expand Up @@ -217,7 +228,7 @@ Status PgSession::RunHelper::Apply(
auto& buffered_keys = pg_session_.buffered_keys_;
bool can_buffer = operations_.empty() && pg_session_.buffering_enabled_ &&
!force_non_bufferable && op->is_write();
UseAsyncFlush use_async_flush_(FLAGS_ysql_use_async_flush && can_buffer);
UseAsyncFlush use_async_flush_(FLAGS_yb_use_async_flush && can_buffer);
// Try buffering this operation if it is a write operation, buffering is enabled and no
// operations have been already applied to current session (yb session does not exist).
if (can_buffer) {
Expand All @@ -237,7 +248,7 @@ Status PgSession::RunHelper::Apply(
}
buffer_.Add(op, relation_id_);
// Flush buffers in case limit of operations in single RPC exceeded.
return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size)
return PREDICT_TRUE(buffered_keys.size() < GetSessionMaxBatchSize())
? Status::OK()
: pg_session_.FlushBufferedOperations(use_async_flush_);
}
Expand Down Expand Up @@ -658,7 +669,7 @@ Result<bool> PgSession::IsInitDbDone() {

Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional,
UseAsyncFlush use_async_flush) {
DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size);
DCHECK(ops.size() > 0 && ops.size() <= GetSessionMaxBatchSize());

if (PREDICT_FALSE(yb_debug_log_docdb_requests)) {
LOG(INFO) << "Flushing buffered operations, using "
Expand Down Expand Up @@ -746,17 +757,17 @@ Result<bool> PgSession::ForeignKeyReferenceExists(PgOid table_id,
return false;
}
std::vector<Slice> ybctids;
const auto reserved_size = std::min<size_t>(FLAGS_ysql_session_max_batch_size,
const auto reserved_size = std::min<size_t>(GetSessionMaxBatchSize(),
fk_reference_intent_.size() + 1);
ybctids.reserve(reserved_size);
ybctids.push_back(ybctid);
// TODO(dmitry): In case number of keys for same table > FLAGS_ysql_session_max_batch_size
// TODO(dmitry): In case number of keys for same table > session max batch size
// two strategy are possible:
// 1. select keys belonging to same tablet to reduce number of simultaneous RPC
// 2. select keys belonging to different tablets to distribute reads among different nodes
const auto intent_match = [table_id](const auto& key) { return key.table_id == table_id; };
for (auto it = fk_reference_intent_.begin();
it != fk_reference_intent_.end() && ybctids.size() < FLAGS_ysql_session_max_batch_size;
it != fk_reference_intent_.end() && ybctids.size() < GetSessionMaxBatchSize();
++it) {
if (intent_match(*it)) {
ybctids.push_back(it->ybctid);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/yql/pggate/pggate_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ DEFINE_uint64(ysql_session_max_batch_size, 512,
"Maximum batch size for buffered writes between PostgreSQL server and YugaByte DocDB "
"services");

DEFINE_bool(ysql_use_async_flush, true,
"Whether to perform async flush");
DEFINE_bool(yb_use_async_flush, true,
"Whether to perform async flush");

DEFINE_bool(ysql_non_txn_copy, false,
"Execute COPY inserts non-transactionally.");
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/pggate_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ DECLARE_int32(ysql_request_limit);
DECLARE_uint64(ysql_prefetch_limit);
DECLARE_double(ysql_backward_prefetch_scale_factor);
DECLARE_uint64(ysql_session_max_batch_size);
DECLARE_bool(ysql_use_async_flush);
DECLARE_bool(yb_use_async_flush);
DECLARE_bool(ysql_non_txn_copy);
DECLARE_int32(ysql_max_read_restart_attempts);
DECLARE_bool(TEST_ysql_disable_transparent_cache_refresh_retry);
Expand Down

0 comments on commit 447199b

Please sign in to comment.