From 1a3a344cab5c8eaf2fbb866e90e4b0ea2fee1f3b Mon Sep 17 00:00:00 2001 From: Nathan Li Date: Thu, 10 Mar 2022 12:53:25 -0500 Subject: [PATCH] [#11628] YSQL: Implementing Async Flush for COPY command. Summary: Currently, as part of any statement, YSQL does some processing and buffers writes. The write buffer is flushed once either of the below conditions is hit - (1) the write buffer is full (i.e., hits ysql_session_max_batch_size limit) (2) a read op is required On a flush, YSQL directs the writes to required tablet servers in different rpcs (all issued in parallel). Only once responses to all RPCs are received, the YSQL backend makes further progress. This waiting behaviour affects performance of bulk loading using COPY FROM because YSQL spends a lot of time waiting for responses. It would be ideal to use that wait time for reading further tuples from the input source and perform necessary processing. In this diff, we are adding some asynchrony to the flush to allow the YSQL's COPY FROM to read more tuples after sending a set of rpcs to tablet servers (without waiting for the responses). This is done by storing the flush future and not waiting for its result immediately. Only when YSQL refills its write buffer, it will wait for the earlier flush's result just before performing the next flush call. Note that the right choice of ysql_session_max_batch_size is required to help us mask almost all of the wait time. The optimal batch size is one in which both of the following tasks (which will run simultaneously after this diff) take almost the same time - (1) YSQL fetching and buffering ysql_session_max_batch_size rows (2) Sending rpcs for the previous ysql_session_max_batch_size rows and arrival of responses from the tserver Note also that there might not be any value of ysql_session_max_batch_size for which both tasks complete at roughly the same time. This could be due to the inherently different speeds of disk reading and tablet servers' performance. Test Plan: Tested manually locally and on portal clusters. Experiments show that there is generally a 20-25% increase in speed when using async flush versus using regular flushing. Reviewers: kannan, smishra, pjain Reviewed By: pjain Subscribers: mtakahara, zyu, lnguyen, yql Differential Revision: https://phabricator.dev.yugabyte.com/D15757 --- .../java/org/yb/pgsql/TestAsyncFlush.java | 57 ++++++++++++ requirements_frozen.txt | 2 +- .../src/backend/bootstrap/bootstrap.c | 2 +- src/postgres/src/backend/catalog/indexing.c | 18 +++- .../catalog/yb_catalog/yb_catalog_version.c | 4 +- src/postgres/src/backend/commands/copy.c | 7 +- src/postgres/src/backend/commands/createas.c | 5 +- src/postgres/src/backend/commands/matview.c | 5 +- src/postgres/src/backend/commands/tablecmds.c | 5 +- src/postgres/src/backend/commands/ybccmds.c | 20 ++-- .../src/backend/executor/ybcModifyTable.c | 59 ++++++++---- src/postgres/src/backend/utils/misc/guc.c | 21 +++++ src/postgres/src/include/commands/copy.h | 2 + .../src/include/executor/ybcModifyTable.h | 6 +- src/yb/common/ybc_util.h | 5 + src/yb/yql/pggate/pg_dml_write.cc | 5 +- src/yb/yql/pggate/pg_dml_write.h | 2 +- src/yb/yql/pggate/pg_doc_op.cc | 16 ++-- src/yb/yql/pggate/pg_doc_op.h | 7 +- src/yb/yql/pggate/pg_session.cc | 92 +++++++++++++------ src/yb/yql/pggate/pg_session.h | 24 +++-- src/yb/yql/pggate/pggate.cc | 18 ++-- src/yb/yql/pggate/pggate.h | 3 +- src/yb/yql/pggate/ybc_pggate.cc | 6 +- src/yb/yql/pggate/ybc_pggate.h | 3 +- 25 files changed, 297 insertions(+), 97 deletions(-) create mode 100644 java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java new file mode 100644 index 000000000000..f4339c0d3660 --- /dev/null +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java @@ -0,0 +1,57 @@ +package org.yb.pgsql; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.sql.Statement; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yb.client.TestUtils; +import org.yb.util.YBTestRunnerNonTsanOnly; + +@RunWith(value = YBTestRunnerNonTsanOnly.class) +public class TestAsyncFlush extends BasePgSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFlush.class); + + private void createDataFile(String absFilePath, int totalLines) throws IOException { + File file = new File(absFilePath); + file.createNewFile(); + + BufferedWriter writer = new BufferedWriter(new FileWriter(file)); + writer.write("a,b,c\n"); + + for (int i = 0; i < totalLines; i++) { + writer.write(i+","+i+","+i+"\n"); + } + writer.close(); + } + + @Test + public void testCopyWithAsyncFlush() throws Exception { + String absFilePath = TestUtils.getBaseTmpDir() + "/copy-async-flush.txt"; + String tableName = "copyAsyncFlush"; + int totalLines = 100000; + + createDataFile(absFilePath, totalLines); + + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("CREATE TABLE %s (a int PRIMARY KEY, b int, c int)", + tableName)); + statement.execute(String.format( + "COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)", + tableName, absFilePath)); + + // Verify row count. + assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines); + + // Verify specific rows are present. + assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=0", 0, 0, 0); + assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=50000", 50000, 50000, 50000); + assertOneRow(statement, "SELECT * FROM " + tableName + " WHERE a=99999", 99999, 99999, 99999); + } + } +} diff --git a/requirements_frozen.txt b/requirements_frozen.txt index 8373c99dfee0..d9a5872dbf24 100644 --- a/requirements_frozen.txt +++ b/requirements_frozen.txt @@ -37,7 +37,7 @@ shutilwhich==1.1.0 six==1.16.0 sys-detection==1.1.1 toml==0.10.2 -typed-ast==1.5.1 +typed-ast==1.4.3 typing-extensions==3.10.0.2 typing-utils==0.1.0 urllib3==1.26.7 diff --git a/src/postgres/src/backend/bootstrap/bootstrap.c b/src/postgres/src/backend/bootstrap/bootstrap.c index d684c5e4d34e..2efa04bdea11 100644 --- a/src/postgres/src/backend/bootstrap/bootstrap.c +++ b/src/postgres/src/backend/bootstrap/bootstrap.c @@ -824,7 +824,7 @@ InsertOneTuple(Oid objectid) HeapTupleSetOid(tuple, objectid); if (IsYugaByteEnabled()) - YBCExecuteInsert(boot_reldesc, tupDesc, tuple); + YBCExecuteInsert(boot_reldesc, tupDesc, tuple, false /* use_async_flush */); else simple_heap_insert(boot_reldesc, tuple); diff --git a/src/postgres/src/backend/catalog/indexing.c b/src/postgres/src/backend/catalog/indexing.c index 8e3eaf1b94e6..222d5b626653 100644 --- a/src/postgres/src/backend/catalog/indexing.c +++ b/src/postgres/src/backend/catalog/indexing.c @@ -300,14 +300,19 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert) */ if (dboid == YBCGetDatabaseOid(heapRel)) continue; /* Will be done after the loop. */ - YBCExecuteInsertForDb(dboid, heapRel, RelationGetDescr(heapRel), tup); + YBCExecuteInsertForDb(dboid, + heapRel, + RelationGetDescr(heapRel), + tup, + false /* use_async_flush */); } YB_FOR_EACH_DB_END; } oid = YBCExecuteInsertForDb(YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), - tup); + tup, + false /* use_async_flush */); /* Update the local cache automatically */ YBSetSysCacheTuple(heapRel, tup); } @@ -359,14 +364,19 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup, */ if (dboid == YBCGetDatabaseOid(heapRel)) continue; /* Will be done after the loop. */ - YBCExecuteInsertForDb(dboid, heapRel, RelationGetDescr(heapRel), tup); + YBCExecuteInsertForDb(dboid, + heapRel, + RelationGetDescr(heapRel), + tup, + false /* use_async_flush */); } YB_FOR_EACH_DB_END; } oid = YBCExecuteInsertForDb(YBCGetDatabaseOid(heapRel), heapRel, RelationGetDescr(heapRel), - tup); + tup, + false /* use_async_flush */); /* Update the local cache automatically */ YBSetSysCacheTuple(heapRel, tup); } diff --git a/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c b/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c index dc996c582dc2..dab24866b001 100644 --- a/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c +++ b/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c @@ -166,7 +166,9 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change) ereport(LOG, (errmsg("%s: incrementing master catalog version (%sbreaking)", __func__, is_breaking_change ? "" : "non"))); - HandleYBStatus(YBCPgDmlExecWriteOp(update_stmt, &rows_affected_count)); + HandleYBStatus(YBCPgDmlExecWriteOp(update_stmt, + &rows_affected_count, + false /* use_async_flush */)); Assert(rows_affected_count == 1); /* Cleanup. */ diff --git a/src/postgres/src/backend/commands/copy.c b/src/postgres/src/backend/commands/copy.c index ed6c98b488c3..685fc3fa4347 100644 --- a/src/postgres/src/backend/commands/copy.c +++ b/src/postgres/src/backend/commands/copy.c @@ -55,7 +55,7 @@ #include "pg_yb_utils.h" #include "executor/ybcModifyTable.h" - +bool yb_use_async_flush = true; #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') @@ -3005,7 +3005,10 @@ CopyFrom(CopyState cstate) } else { - YBCExecuteInsert(resultRelInfo->ri_RelationDesc, tupDesc, tuple); + YBCExecuteInsert(resultRelInfo->ri_RelationDesc, + tupDesc, + tuple, + yb_use_async_flush); } } else if (resultRelInfo->ri_FdwRoutine != NULL) diff --git a/src/postgres/src/backend/commands/createas.c b/src/postgres/src/backend/commands/createas.c index a0526b2de25a..dddea6eb6858 100644 --- a/src/postgres/src/backend/commands/createas.c +++ b/src/postgres/src/backend/commands/createas.c @@ -614,7 +614,10 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) if (IsYBRelation(myState->rel)) { - YBCExecuteInsert(myState->rel, RelationGetDescr(myState->rel), tuple); + YBCExecuteInsert(myState->rel, + RelationGetDescr(myState->rel), + tuple, + false /* use_async_flush */); } else { diff --git a/src/postgres/src/backend/commands/matview.c b/src/postgres/src/backend/commands/matview.c index 69ffea37d487..891e44a39c56 100644 --- a/src/postgres/src/backend/commands/matview.c +++ b/src/postgres/src/backend/commands/matview.c @@ -504,7 +504,10 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) tuple = ExecMaterializeSlot(slot); if (IsYBRelation(myState->transientrel)) { - YBCExecuteInsert(myState->transientrel, RelationGetDescr(myState->transientrel), tuple); + YBCExecuteInsert(myState->transientrel, + RelationGetDescr(myState->transientrel), + tuple, + false /* use_async_flush */); } else { diff --git a/src/postgres/src/backend/commands/tablecmds.c b/src/postgres/src/backend/commands/tablecmds.c index f475576c1173..edb9f562d459 100644 --- a/src/postgres/src/backend/commands/tablecmds.c +++ b/src/postgres/src/backend/commands/tablecmds.c @@ -7446,7 +7446,10 @@ YBCopyTableRowsUnchecked(Relation oldrel, Relation newrel, AttrNumber* attmap) ExecStoreHeapTuple(tuple, newslot, false); /* Write the tuple out to the new relation */ - YBCExecuteInsert(newrel, newslot->tts_tupleDescriptor, tuple); + YBCExecuteInsert(newrel, + newslot->tts_tupleDescriptor, + tuple, + false /* use_async_flush */); MemoryContextReset(per_tup_cxt); diff --git a/src/postgres/src/backend/commands/ybccmds.c b/src/postgres/src/backend/commands/ybccmds.c index 0be0cc95d900..50274a0a97cd 100644 --- a/src/postgres/src/backend/commands/ybccmds.c +++ b/src/postgres/src/backend/commands/ybccmds.c @@ -658,7 +658,9 @@ YBCDropTable(Oid relationId) { HandleYBStatusIgnoreNotFound(YBCPgDmlBindTable(handle), ¬_found); int rows_affected_count = 0; - HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, &rows_affected_count), + HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, + &rows_affected_count, + false /* use_async_flush */), ¬_found); } } @@ -705,11 +707,13 @@ YBCTruncateTable(Relation rel) { /* Create table-level tombstone for colocated tables / tables in tablegroups */ HandleYBStatus(YBCPgNewTruncateColocated(databaseId, relationId, - false, + false /* is_single_row_txn */, &handle)); HandleYBStatus(YBCPgDmlBindTable(handle)); int rows_affected_count = 0; - HandleYBStatus(YBCPgDmlExecWriteOp(handle, &rows_affected_count)); + HandleYBStatus(YBCPgDmlExecWriteOp(handle, + &rows_affected_count, + false /* use_async_flush */)); } else { @@ -748,11 +752,13 @@ YBCTruncateTable(Relation rel) { /* Create index-level tombstone for colocated indexes / indexes in tablegroups */ HandleYBStatus(YBCPgNewTruncateColocated(databaseId, indexId, - false, + false /* is_single_row_txn */, &handle)); HandleYBStatus(YBCPgDmlBindTable(handle)); int rows_affected_count = 0; - HandleYBStatus(YBCPgDmlExecWriteOp(handle, &rows_affected_count)); + HandleYBStatus(YBCPgDmlExecWriteOp(handle, + &rows_affected_count, + false /* use_async_flush */)); } else { @@ -1240,7 +1246,9 @@ YBCDropIndex(Oid relationId) if (valid_handle) { HandleYBStatusIgnoreNotFound(YBCPgDmlBindTable(handle), ¬_found); int rows_affected_count = 0; - HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, &rows_affected_count), + HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, + &rows_affected_count, + false /* use_async_flush */), ¬_found); } } diff --git a/src/postgres/src/backend/executor/ybcModifyTable.c b/src/postgres/src/backend/executor/ybcModifyTable.c index 1c8571ec3422..b9f2d42a0805 100644 --- a/src/postgres/src/backend/executor/ybcModifyTable.c +++ b/src/postgres/src/backend/executor/ybcModifyTable.c @@ -172,14 +172,15 @@ static void YBCBindTupleId(YBCPgStatement pg_stmt, Datum tuple_id) { */ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt, Relation rel, - int *rows_affected_count) + int *rows_affected_count, + bool use_async_flush) { HandleYBStatus(YBCPgSetCatalogCacheVersion(ybc_stmt, yb_catalog_cache_version)); bool is_syscatalog_version_inc = YbMarkStatementIfCatalogVersionIncrement(ybc_stmt, rel); /* Execute the insert. */ - HandleYBStatus(YBCPgDmlExecWriteOp(ybc_stmt, rows_affected_count)); + HandleYBStatus(YBCPgDmlExecWriteOp(ybc_stmt, rows_affected_count, use_async_flush)); /* * Optimization to increment the catalog version for the local cache as @@ -205,7 +206,8 @@ static Oid YBCExecuteInsertInternal(Oid dboid, Relation rel, TupleDesc tupleDesc, HeapTuple tuple, - bool is_single_row_txn) + bool is_single_row_txn, + bool use_async_flush) { Oid relid = RelationGetRelid(rel); AttrNumber minattr = YBGetFirstLowInvalidAttributeNumber(rel); @@ -285,7 +287,7 @@ static Oid YBCExecuteInsertInternal(Oid dboid, } /* Execute the insert */ - YBCExecWriteStmt(insert_stmt, rel, NULL /* rows_affected_count */); + YBCExecWriteStmt(insert_stmt, rel, NULL /* rows_affected_count */, use_async_flush); /* Cleanup. */ YBCPgDeleteStatement(insert_stmt); @@ -298,25 +300,29 @@ static Oid YBCExecuteInsertInternal(Oid dboid, Oid YBCExecuteInsert(Relation rel, TupleDesc tupleDesc, - HeapTuple tuple) + HeapTuple tuple, + bool use_async_flush) { return YBCExecuteInsertForDb(YBCGetDatabaseOid(rel), rel, tupleDesc, - tuple); + tuple, + use_async_flush); } Oid YBCExecuteInsertForDb(Oid dboid, Relation rel, TupleDesc tupleDesc, - HeapTuple tuple) + HeapTuple tuple, + bool use_async_flush) { bool non_transactional = !IsSystemRelation(rel) && yb_disable_transactional_writes; return YBCExecuteInsertInternal(dboid, rel, tupleDesc, tuple, - non_transactional); + non_transactional, + use_async_flush); } Oid YBCExecuteNonTxnInsert(Relation rel, @@ -338,7 +344,8 @@ Oid YBCExecuteNonTxnInsertForDb(Oid dboid, rel, tupleDesc, tuple, - true /* is_single_row_txn */); + true /* is_single_row_txn */, + false /* use_async_flush */); } Oid YBCHeapInsert(TupleTableSlot *slot, @@ -379,7 +386,8 @@ Oid YBCHeapInsertForDb(Oid dboid, return YBCExecuteInsertForDb(dboid, resultRelationDesc, slot->tts_tupleDescriptor, - tuple); + tuple, + false /* use_async_flush */); } } @@ -509,7 +517,8 @@ void YBCExecuteInsertIndexForDb(Oid dboid, /* Execute the insert and clean up. */ YBCExecWriteStmt(insert_stmt, index, - NULL /* rows_affected_count */); + NULL /* rows_affected_count */, + false /* use_async_flush */); /* Cleanup. */ YBCPgDeleteStatement(insert_stmt); @@ -582,7 +591,7 @@ bool YBCExecuteDelete(Relation rel, TupleTableSlot *slot, EState *estate, * a row across partitions, pass &rows_affected_count even if this * is not a single row transaction. */ - YBCExecWriteStmt(delete_stmt, rel, &rows_affected_count); + YBCExecWriteStmt(delete_stmt, rel, &rows_affected_count, false /* use_async_flush */); /* Cleanup. */ YBCPgDeleteStatement(delete_stmt); return rows_affected_count > 0; @@ -606,7 +615,8 @@ bool YBCExecuteDelete(Relation rel, TupleTableSlot *slot, EState *estate, YBCExecWriteStmt(delete_stmt, rel, - isSingleRow ? &rows_affected_count : NULL); + isSingleRow ? &rows_affected_count : NULL, + false /* use_async_flush */); /* * Fetch values of the columns required to evaluate returning clause @@ -700,7 +710,10 @@ void YBCExecuteDeleteIndex(Relation index, HandleYBStatus(YBCPgDeleteStmtSetIsPersistNeeded(delete_stmt, true)); - YBCExecWriteStmt(delete_stmt, index, NULL /* rows_affected_count */); + YBCExecWriteStmt(delete_stmt, + index, + NULL /* rows_affected_count */, + false /* use_async_flush */); YBCPgDeleteStatement(delete_stmt); } @@ -865,7 +878,8 @@ bool YBCExecuteUpdate(Relation rel, /* If update batching is allowed, then ignore rows_affected_count. */ YBCExecWriteStmt(update_stmt, rel, - can_batch_update ? NULL : &rows_affected_count); + can_batch_update ? NULL : &rows_affected_count, + false /* use_async_flush */); /* * Fetch values of the columns required to evaluate returning clause @@ -947,7 +961,10 @@ Oid YBCExecuteUpdateReplace(Relation rel, YBCExecuteDelete(rel, slot, estate, mtstate, false /* changingPart */); - Oid tupleoid = YBCExecuteInsert(rel, RelationGetDescr(rel), tuple); + Oid tupleoid = YBCExecuteInsert(rel, + RelationGetDescr(rel), + tuple, + false /* use_async_flush */); return tupleoid; } @@ -986,7 +1003,10 @@ void YBCDeleteSysCatalogTuple(Relation rel, HeapTuple tuple) MarkCurrentCommandUsed(); CacheInvalidateHeapTuple(rel, tuple, NULL); - YBCExecWriteStmt(delete_stmt, rel, NULL /* rows_affected_count */); + YBCExecWriteStmt(delete_stmt, + rel, + NULL /* rows_affected_count */, + false /* use_async_flush */); /* Cleanup. */ YBCPgDeleteStatement(delete_stmt); @@ -1054,7 +1074,10 @@ void YBCUpdateSysCatalogTupleForDb(Oid dboid, Relation rel, HeapTuple oldtuple, CacheInvalidateHeapTuple(rel, tuple, NULL); /* Execute the statement and clean up */ - YBCExecWriteStmt(update_stmt, rel, NULL /* rows_affected_count */); + YBCExecWriteStmt(update_stmt, + rel, + NULL /* rows_affected_count */, + false /* use_async_flush */); /* Cleanup. */ YBCPgDeleteStatement(update_stmt);; diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index 711e26425a6a..99e67400438a 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -2055,6 +2055,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"yb_use_async_flush", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Perform async flushes if applicable."), + NULL + }, + &yb_use_async_flush, + true, + NULL, NULL, NULL + }, + { {"yb_enable_expression_pushdown", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Push supported expressions down to DocDB for evaluation."), @@ -3332,6 +3342,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"ysql_session_max_batch_size", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Sets the maximum batch size for writes that YSQL can buffer before flushing to tablet servers."), + gettext_noop("If this is 0, YSQL will use the gflag ysql_session_max_batch_size. If non-zero, this session variable will supersede the value of the gflag."), + 0 + }, + &ysql_session_max_batch_size, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"yb_follower_read_staleness_ms", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the staleness (in ms) to be used for performing follower reads."), diff --git a/src/postgres/src/include/commands/copy.h b/src/postgres/src/include/commands/copy.h index 40db32c4e828..556de59b22c9 100644 --- a/src/postgres/src/include/commands/copy.h +++ b/src/postgres/src/include/commands/copy.h @@ -21,6 +21,8 @@ #define DEFAULT_BATCH_ROWS_PER_TRANSACTION 1000 +extern bool yb_use_async_flush; + /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); diff --git a/src/postgres/src/include/executor/ybcModifyTable.h b/src/postgres/src/include/executor/ybcModifyTable.h index 771224ea5da2..dd543f0e3879 100644 --- a/src/postgres/src/include/executor/ybcModifyTable.h +++ b/src/postgres/src/include/executor/ybcModifyTable.h @@ -63,11 +63,13 @@ extern Oid YBCHeapInsertForDb(Oid dboid, */ extern Oid YBCExecuteInsert(Relation rel, TupleDesc tupleDesc, - HeapTuple tuple); + HeapTuple tuple, + bool use_async_flush); extern Oid YBCExecuteInsertForDb(Oid dboid, Relation rel, TupleDesc tupleDesc, - HeapTuple tuple); + HeapTuple tuple, + bool use_async_flush); /* * Execute the insert outside of a transaction. diff --git a/src/yb/common/ybc_util.h b/src/yb/common/ybc_util.h index 5e888f5fd9e0..7b235d69ae0a 100644 --- a/src/yb/common/ybc_util.h +++ b/src/yb/common/ybc_util.h @@ -54,6 +54,11 @@ extern bool yb_force_global_transaction; */ extern bool suppress_nonpg_logs; +/* + * Guc variable to control the max session batch size before flushing. + */ +extern int ysql_session_max_batch_size; + /* * Guc variable to enable binary restore from a binary backup of YSQL tables. When doing binary * restore, we copy the docdb SST files of those tables from the source database and reuse them diff --git a/src/yb/yql/pggate/pg_dml_write.cc b/src/yb/yql/pggate/pg_dml_write.cc index b61bb70860fd..8be62f78d9c9 100644 --- a/src/yb/yql/pggate/pg_dml_write.cc +++ b/src/yb/yql/pggate/pg_dml_write.cc @@ -108,7 +108,7 @@ Status PgDmlWrite::DeleteEmptyPrimaryBinds() { return Status::OK(); } -Status PgDmlWrite::Exec(bool force_non_bufferable) { +Status PgDmlWrite::Exec(bool force_non_bufferable, bool use_async_flush) { // Delete allocated binds that are not associated with a value. // YBClient interface enforce us to allocate binds for primary key columns in their indexing @@ -136,7 +136,8 @@ Status PgDmlWrite::Exec(bool force_non_bufferable) { // Execute the statement. If the request has been sent, get the result and handle any rows // returned. - if (VERIFY_RESULT(doc_op_->Execute(force_non_bufferable)) == RequestSent::kTrue) { + if (VERIFY_RESULT( + doc_op_->Execute(force_non_bufferable, use_async_flush)) == RequestSent::kTrue) { RETURN_NOT_OK(doc_op_->GetResult(&rowsets_)); // Save the number of rows affected by the op. diff --git a/src/yb/yql/pggate/pg_dml_write.h b/src/yb/yql/pggate/pg_dml_write.h index 508f40a0c223..0ec45e8ec014 100644 --- a/src/yb/yql/pggate/pg_dml_write.h +++ b/src/yb/yql/pggate/pg_dml_write.h @@ -36,7 +36,7 @@ class PgDmlWrite : public PgDml { void PrepareColumns(); // force_non_bufferable flag indicates this operation should not be buffered. - CHECKED_STATUS Exec(bool force_non_bufferable = false); + CHECKED_STATUS Exec(bool force_non_bufferable = false, bool use_async_flush = false); void SetIsSystemCatalogChange() { write_req_->set_is_ysql_catalog_change(true); diff --git a/src/yb/yql/pggate/pg_doc_op.cc b/src/yb/yql/pggate/pg_doc_op.cc index aba75ca84080..65fc96f4ea35 100644 --- a/src/yb/yql/pggate/pg_doc_op.cc +++ b/src/yb/yql/pggate/pg_doc_op.cc @@ -168,7 +168,7 @@ const PgExecParameters& PgDocOp::ExecParameters() const { return exec_params_; } -Result PgDocOp::Execute(bool force_non_bufferable) { +Result PgDocOp::Execute(bool force_non_bufferable, bool use_async_flush) { // As of 09/25/2018, DocDB doesn't cache or keep any execution state for a statement, so we // have to call query execution every time. // - Normal SQL convention: Exec, Fetch, Fetch, ... @@ -176,7 +176,7 @@ Result PgDocOp::Execute(bool force_non_bufferable) { // This refers to the sequence of operations between this layer and the underlying tablet // server / DocDB layer, not to the sequence of operations between the PostgreSQL layer and this // layer. - exec_status_ = SendRequest(force_non_bufferable); + exec_status_ = SendRequest(force_non_bufferable, use_async_flush); RETURN_NOT_OK(exec_status_); return RequestSent(response_.Valid()); } @@ -188,7 +188,7 @@ Status PgDocOp::GetResult(list *rowsets) { if (!end_of_data_) { // Send request now in case prefetching was suppressed. if (suppress_next_result_prefetching_ && !response_.Valid()) { - exec_status_ = SendRequest(true /* force_non_bufferable */); + exec_status_ = SendRequest(true /* force_non_bufferable */, false /* use_async_flush */); RETURN_NOT_OK(exec_status_); } @@ -201,7 +201,7 @@ Status PgDocOp::GetResult(list *rowsets) { rowsets->splice(rowsets->end(), rows); // Prefetch next portion of data if needed. if (!(end_of_data_ || suppress_next_result_prefetching_)) { - exec_status_ = SendRequest(true /* force_non_bufferable */); + exec_status_ = SendRequest(true /* force_non_bufferable */, false /* use_async_flush */); RETURN_NOT_OK(exec_status_); } } @@ -262,14 +262,14 @@ void PgDocOp::MoveInactiveOpsOutside() { active_op_count_ = left_iter; } -Status PgDocOp::SendRequest(bool force_non_bufferable) { +Status PgDocOp::SendRequest(bool force_non_bufferable, bool use_async_flush) { DCHECK(exec_status_.ok()); DCHECK(!response_.Valid()); - exec_status_ = SendRequestImpl(force_non_bufferable); + exec_status_ = SendRequestImpl(force_non_bufferable, use_async_flush); return exec_status_; } -Status PgDocOp::SendRequestImpl(bool force_non_bufferable) { +Status PgDocOp::SendRequestImpl(bool force_non_bufferable, bool use_async_flush) { // Populate collected information into protobuf requests before sending to DocDB. RETURN_NOT_OK(CreateRequests()); @@ -284,7 +284,7 @@ Status PgDocOp::SendRequestImpl(bool force_non_bufferable) { size_t send_count = std::min(parallelism_level_, active_op_count_); response_ = VERIFY_RESULT(pg_session_->RunAsync( pgsql_ops_.data(), send_count, *table_, relation_id_, &GetReadTime(), - force_non_bufferable)); + force_non_bufferable, use_async_flush)); return Status::OK(); } diff --git a/src/yb/yql/pggate/pg_doc_op.h b/src/yb/yql/pggate/pg_doc_op.h index 6e54602ef8fb..bd8afe7af30d 100644 --- a/src/yb/yql/pggate/pg_doc_op.h +++ b/src/yb/yql/pggate/pg_doc_op.h @@ -225,7 +225,8 @@ class PgDocOp : public std::enable_shared_from_this { const PgExecParameters& ExecParameters() const; // Execute the op. Return true if the request has been sent and is awaiting the result. - virtual Result Execute(bool force_non_bufferable = false); + virtual Result Execute(bool force_non_bufferable = false, + bool use_async_flush = false); // Instruct this doc_op to abandon execution and querying data by setting end_of_data_ to 'true'. // - This op will not send request to tablet server. @@ -299,9 +300,9 @@ class PgDocOp : public std::enable_shared_from_this { void SetReadTime(); private: - CHECKED_STATUS SendRequest(bool force_non_bufferable); + CHECKED_STATUS SendRequest(bool force_non_bufferable, bool use_async_flush); - virtual CHECKED_STATUS SendRequestImpl(bool force_non_bufferable); + virtual CHECKED_STATUS SendRequestImpl(bool force_non_bufferable, bool use_async_flush); Result> ProcessResponse(const Status& exec_status); diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index fbe4e3e7fb20..5b7b429e41c4 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -70,6 +70,10 @@ DECLARE_int32(TEST_user_ddl_operation_timeout_sec); DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests."); +// If this is set in the user's session to a positive value, it will supersede the gflag +// ysql_session_max_batch_size. +int ysql_session_max_batch_size = 0; + namespace yb { namespace pggate { @@ -96,6 +100,9 @@ namespace { static constexpr const size_t kPgSequenceLastValueColIdx = 2; static constexpr const size_t kPgSequenceIsCalledColIdx = 3; +bool has_prev_future = false; +PerformFuture prevFlushFuture; + docdb::PrimitiveValue NullValue(SortingType sorting) { using SortingType = SortingType; @@ -152,6 +159,13 @@ bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) { } } +/* Use the gflag value if the session variable is unset. */ +uint64_t GetSessionMaxBatchSize() { + return ysql_session_max_batch_size <= 0 + ? FLAGS_ysql_session_max_batch_size + : (uint64_t) ysql_session_max_batch_size; +} + struct PgForeignKeyReferenceLightweight { PgOid table_id; Slice ybctid; @@ -217,7 +231,8 @@ PgSession::RunHelper::RunHelper(const PgObjectId& relation_id, } Status PgSession::RunHelper::Apply( - const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable) { + const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable, + UseAsyncFlush use_async_flush) { auto& buffered_keys = pg_session_.buffered_keys_; // Try buffering this operation if it is a write operation, buffering is enabled and no // operations have been already applied to current session (yb session does not exist). @@ -231,7 +246,7 @@ Status PgSession::RunHelper::Apply( // Flush is required in this case. RowIdentifier row_id(schema, wop); if (PREDICT_FALSE(!buffered_keys.insert(row_id).second)) { - RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); + RETURN_NOT_OK(pg_session_.FlushBufferedOperations(use_async_flush)); buffered_keys.insert(row_id); } if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { @@ -239,9 +254,9 @@ Status PgSession::RunHelper::Apply( } buffer_.Add(op, relation_id_); // Flush buffers in case limit of operations in single RPC exceeded. - return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size) + return PREDICT_TRUE(buffered_keys.size() < GetSessionMaxBatchSize()) ? Status::OK() - : pg_session_.FlushBufferedOperations(); + : pg_session_.FlushBufferedOperations(use_async_flush); } bool read_only = op->is_read(); // Flush all buffered operations (if any) before performing non-bufferable operation @@ -258,17 +273,17 @@ Status PgSession::RunHelper::Apply( full_flush_required = IsTableUsedByOperation(*op, i->table_id()); } if (full_flush_required) { - RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); + RETURN_NOT_OK(pg_session_.FlushBufferedOperations(use_async_flush)); } else { RETURN_NOT_OK(pg_session_.FlushBufferedOperationsImpl( - [this](auto ops, auto transactional) -> Status { + [this](auto ops, auto transactional, auto use_async_flush) -> Status { if (transactional == transactional_) { // Save buffered operations for further applying before non-buffered operation. operations_.Swap(&ops); return Status::OK(); } - return pg_session_.FlushOperations(std::move(ops), transactional); - })); + return pg_session_.FlushOperations(std::move(ops), transactional, use_async_flush); + }, use_async_flush)); read_only = read_only && operations_.empty(); } } @@ -725,7 +740,7 @@ Status PgSession::StartOperationsBuffering() { Status PgSession::StopOperationsBuffering() { SCHECK(buffering_enabled_, IllegalState, "Buffering hasn't been started"); buffering_enabled_ = false; - return FlushBufferedOperations(); + return FlushBufferedOperations(UseAsyncFlush::kFalse); } void PgSession::ResetOperationsBuffering() { @@ -733,10 +748,10 @@ void PgSession::ResetOperationsBuffering() { buffering_enabled_ = false; } -Status PgSession::FlushBufferedOperations() { - return FlushBufferedOperationsImpl([this](auto ops, auto txn) { - return this->FlushOperations(std::move(ops), txn); - }); +Status PgSession::FlushBufferedOperations(UseAsyncFlush use_async_flush) { + return FlushBufferedOperationsImpl([this](auto ops, auto txn, auto use_async_flush) { + return this->FlushOperations(std::move(ops), txn, use_async_flush); + }, use_async_flush); } void PgSession::DropBufferedOperations() { @@ -751,24 +766,34 @@ PgIsolationLevel PgSession::GetIsolationLevel() { return pg_txn_manager_->GetPgIsolationLevel(); } -Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) { +Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher, + UseAsyncFlush use_async_flush) { auto ops = std::move(buffered_ops_); auto txn_ops = std::move(buffered_txn_ops_); buffered_keys_.clear(); buffered_ops_.Clear(); buffered_txn_ops_.Clear(); if (!ops.empty()) { - RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse)); + RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse, use_async_flush)); } if (!txn_ops.empty()) { SCHECK(!YBCIsInitDbModeEnvVarSet(), IllegalState, "No transactional operations are expected in the initdb mode"); - RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue)); + RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue, use_async_flush)); } return Status::OK(); } +Status PgSession::ProcessPreviousFlush() { + Status flush_status; + if (has_prev_future) { + flush_status = prevFlushFuture.Get(); + has_prev_future = false; + } + return flush_status; +} + Result PgSession::ShouldHandleTransactionally(const PgTableDesc& table, const PgsqlOp& op) { if (!table.schema().table_properties().is_transactional() || !op.need_transaction() || @@ -800,7 +825,8 @@ Result PgSession::IsInitDbDone() { return pg_client_.IsInitDbDone(); } -Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional) { +Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional, + UseAsyncFlush use_async_flush) { DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size); if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { @@ -819,12 +845,23 @@ Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSessi in_txn_limit_ = clock_->Now(); } - std::promise promise; - Perform(&ops.operations, [&promise](const PerformResult& result) { - promise.set_value(result); + auto promise = std::make_shared>(); + Perform(&ops.operations, [promise](const PerformResult& result) { + promise->set_value(result); }); - PerformFuture future(promise.get_future(), this, &ops.relations); - return future.Get(); + PerformFuture future(promise->get_future(), this, &ops.relations); + Status flush_status; + if (use_async_flush) { + if (!has_prev_future) { + has_prev_future = true; + } else { + flush_status = prevFlushFuture.Get(); + } + prevFlushFuture = std::move(future); + } else { + flush_status = future.Get(); + } + return flush_status; } void PgSession::Perform(PgsqlOps* operations, const PerformCallback& callback) { @@ -881,17 +918,18 @@ Result PgSession::ForeignKeyReferenceExists(PgOid table_id, return false; } std::vector ybctids; - const auto reserved_size = std::min(FLAGS_ysql_session_max_batch_size, + const auto reserved_size = std::min(GetSessionMaxBatchSize(), fk_reference_intent_.size() + 1); ybctids.reserve(reserved_size); ybctids.push_back(ybctid); - // TODO(dmitry): In case number of keys for same table > FLAGS_ysql_session_max_batch_size + // TODO(dmitry): In case number of keys for same table > session max batch size // two strategy are possible: // 1. select keys belonging to same tablet to reduce number of simultaneous RPC // 2. select keys belonging to different tablets to distribute reads among different nodes const auto intent_match = [table_id](const auto& key) { return key.table_id == table_id; }; for (auto it = fk_reference_intent_.begin(); - it != fk_reference_intent_.end() && ybctids.size() < FLAGS_ysql_session_max_batch_size; + it != fk_reference_intent_.end() && + ybctids.size() < GetSessionMaxBatchSize(); ++it) { if (intent_match(*it)) { ybctids.push_back(it->ybctid); @@ -987,7 +1025,7 @@ Status PgSession::SetActiveSubTransaction(SubTransactionId id) { // ensuring that previous operations use previous SubTransactionMetadata. If we do not flush here, // already queued operations may incorrectly use this newly modified SubTransactionMetadata when // they are eventually sent to DocDB. - RETURN_NOT_OK(FlushBufferedOperations()); + RETURN_NOT_OK(FlushBufferedOperations(UseAsyncFlush::kFalse)); tserver::PgPerformOptionsPB* options_ptr = nullptr; tserver::PgPerformOptionsPB options; if (pg_txn_manager_->GetIsolationLevel() == IsolationLevel::NON_TRANSACTIONAL) { @@ -1008,7 +1046,7 @@ Status PgSession::RollbackSubTransaction(SubTransactionId id) { // eventually send this metadata. // See comment in SetActiveSubTransaction -- we must flush buffered operations before updating any // SubTransactionMetadata. - RETURN_NOT_OK(FlushBufferedOperations()); + RETURN_NOT_OK(FlushBufferedOperations(UseAsyncFlush::kFalse)); return pg_client_.RollbackSubTransaction(id); } diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 8c32bcddba41..27b3c8d5b0e6 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -98,6 +98,7 @@ class RowIdentifier { }; YB_STRONGLY_TYPED_BOOL(IsTransactionalSession); +YB_STRONGLY_TYPED_BOOL(UseAsyncFlush); YB_STRONGLY_TYPED_BOOL(IsReadOnlyOperation); YB_STRONGLY_TYPED_BOOL(IsCatalogOperation); @@ -211,7 +212,9 @@ class PgSession : public RefCountedThreadSafe { void ResetOperationsBuffering(); // Flush all pending buffered operations. Buffering mode remain unchanged. - CHECKED_STATUS FlushBufferedOperations(); + CHECKED_STATUS FlushBufferedOperations(UseAsyncFlush use_async_flush); + // Process previous flush for async flush. + CHECKED_STATUS ProcessPreviousFlush(); // Drop all pending buffered operations. Buffering mode remain unchanged. void DropBufferedOperations(); @@ -323,10 +326,13 @@ class PgSession : public RefCountedThreadSafe { CHECKED_STATUS RollbackSubTransaction(SubTransactionId id); private: - using Flusher = std::function; + using Flusher = std::function; - CHECKED_STATUS FlushBufferedOperationsImpl(const Flusher& flusher); - CHECKED_STATUS FlushOperations(BufferableOperations ops, IsTransactionalSession transactional); + CHECKED_STATUS FlushBufferedOperationsImpl(const Flusher& flusher, + UseAsyncFlush use_async_flush); + CHECKED_STATUS FlushOperations(BufferableOperations ops, IsTransactionalSession transactional, + UseAsyncFlush use_async_flush); // Run multiple operations. template @@ -335,13 +341,16 @@ class PgSession : public RefCountedThreadSafe { const PgTableDesc& table, const PgObjectId& relation_id, uint64_t* read_time, - bool force_non_bufferable) { + bool force_non_bufferable, + bool use_async_flush) { SCHECK_GT(ops_count, 0ULL, IllegalState, "Operation list must not be empty"); const IsTransactionalSession transactional(VERIFY_RESULT( ShouldHandleTransactionally(table, **op))); + UseAsyncFlush use_async_flush_(use_async_flush); RunHelper runner(relation_id, this, transactional); for (auto end = op + ops_count; op != end; ++op) { - RETURN_NOT_OK(runner.Apply(table.schema(), *op, read_time, force_non_bufferable)); + RETURN_NOT_OK(runner.Apply(table.schema(), *op, read_time, force_non_bufferable, + use_async_flush_)); } return runner.Flush(); } @@ -354,7 +363,8 @@ class PgSession : public RefCountedThreadSafe { RunHelper( const PgObjectId& relation_id, PgSession* pg_session, IsTransactionalSession transactional); CHECKED_STATUS Apply( - const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable); + const Schema& schema, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable, + UseAsyncFlush use_async_flush); Result Flush(); private: diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 6ac8818884f1..e5021826d6ad 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1052,10 +1052,12 @@ void PgApiImpl::ResetOperationsBuffering() { } Status PgApiImpl::FlushBufferedOperations() { - return pg_session_->FlushBufferedOperations(); + return pg_session_->FlushBufferedOperations(UseAsyncFlush::kFalse); } -Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count) { +Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, + int32_t *rows_affected_count, + bool use_async_flush) { switch (handle->stmt_op()) { case StmtOp::STMT_INSERT: case StmtOp::STMT_UPDATE: @@ -1063,7 +1065,8 @@ Status PgApiImpl::DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_cou case StmtOp::STMT_TRUNCATE: { auto dml_write = down_cast(handle); - RETURN_NOT_OK(dml_write->Exec(rows_affected_count != nullptr /* force_non_bufferable */)); + RETURN_NOT_OK(dml_write->Exec(rows_affected_count != nullptr /* force_non_bufferable */, + use_async_flush)); if (rows_affected_count) { *rows_affected_count = dml_write->GetRowsAffectedCount(); } @@ -1440,7 +1443,8 @@ Status PgApiImpl::RestartReadPoint() { Status PgApiImpl::CommitTransaction() { pg_session_->InvalidateForeignKeyReferenceCache(); - RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); + RETURN_NOT_OK(pg_session_->FlushBufferedOperations(UseAsyncFlush::kFalse)); + RETURN_NOT_OK(pg_session_->ProcessPreviousFlush()); return pg_txn_manager_->CommitTransaction(); } @@ -1468,13 +1472,13 @@ Status PgApiImpl::SetTransactionDeferrable(bool deferrable) { Status PgApiImpl::EnterSeparateDdlTxnMode() { // Flush all buffered operations as ddl txn use its own transaction session. - RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); + RETURN_NOT_OK(pg_session_->FlushBufferedOperations(UseAsyncFlush::kFalse)); return pg_txn_manager_->EnterSeparateDdlTxnMode(); } Status PgApiImpl::ExitSeparateDdlTxnMode() { // Flush all buffered operations as ddl txn use its own transaction session. - RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); + RETURN_NOT_OK(pg_session_->FlushBufferedOperations(UseAsyncFlush::kFalse)); RETURN_NOT_OK(pg_txn_manager_->ExitSeparateDdlTxnMode(Commit::kTrue)); // Next reads from catalog tables have to see changes made by the DDL transaction. ResetCatalogReadTime(); @@ -1487,7 +1491,7 @@ void PgApiImpl::ClearSeparateDdlTxnMode() { } Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) { - RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); + RETURN_NOT_OK(pg_session_->FlushBufferedOperations(UseAsyncFlush::kFalse)); return pg_session_->SetActiveSubTransaction(id); } diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index a4963aef301e..676b2687e6df 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -391,7 +391,8 @@ class PgApiImpl { PgSysColumns *syscols, bool *has_data); // Utility method that checks stmt type and calls exec insert, update, or delete internally. - CHECKED_STATUS DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count); + CHECKED_STATUS DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count, + bool use_async_flush); // This function adds a primary column to be used in the construction of the tuple id (ybctid). CHECKED_STATUS DmlAddYBTupleIdColumn(PgStatement *handle, int attr_num, uint64_t datum, diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 62cf215b17fc..8a58182863ab 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -658,8 +658,10 @@ YBCStatus YBCPgFlushBufferedOperations() { return ToYBCStatus(pgapi->FlushBufferedOperations()); } -YBCStatus YBCPgDmlExecWriteOp(YBCPgStatement handle, int32_t *rows_affected_count) { - return ToYBCStatus(pgapi->DmlExecWriteOp(handle, rows_affected_count)); +YBCStatus YBCPgDmlExecWriteOp(YBCPgStatement handle, + int32_t *rows_affected_count, + bool use_async_flush) { + return ToYBCStatus(pgapi->DmlExecWriteOp(handle, rows_affected_count, use_async_flush)); } YBCStatus YBCPgBuildYBTupleId(const YBCPgYBTupleIdDescriptor *source, uint64_t *ybctid) { diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index f66d089e4273..2ba52855cf40 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -361,7 +361,8 @@ YBCStatus YBCPgDmlFetch(YBCPgStatement handle, int32_t natts, uint64_t *values, YBCPgSysColumns *syscols, bool *has_data); // Utility method that checks stmt type and calls either exec insert, update, or delete internally. -YBCStatus YBCPgDmlExecWriteOp(YBCPgStatement handle, int32_t *rows_affected_count); +YBCStatus YBCPgDmlExecWriteOp(YBCPgStatement handle, int32_t *rows_affected_count, + bool use_async_flush); // This function returns the tuple id (ybctid) of a Postgres tuple. YBCStatus YBCPgBuildYBTupleId(const YBCPgYBTupleIdDescriptor* data, uint64_t *ybctid);