Skip to content

Commit

Permalink
Revert "[#11628] YSQL: Implementing Async Flush for COPY command."
Browse files Browse the repository at this point in the history
Summary:
This reverts commit 1a3a344.

Reverting current implementation of async flush changes so that we can refactor, fix potential bugs, and improve implementation details.

Test Plan:
Jenkins: urgent

Built and run a COPY locally to verify it worked.

Reviewers: pjain

Reviewed By: pjain

Subscribers: dmitry, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D15975
  • Loading branch information
nathanhjli committed Mar 15, 2022
1 parent 018c0b2 commit 7ca347b
Show file tree
Hide file tree
Showing 25 changed files with 97 additions and 297 deletions.
57 changes: 0 additions & 57 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestAsyncFlush.java

This file was deleted.

2 changes: 1 addition & 1 deletion requirements_frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ shutilwhich==1.1.0
six==1.16.0
sys-detection==1.1.1
toml==0.10.2
typed-ast==1.4.3
typed-ast==1.5.1
typing-extensions==3.10.0.2
typing-utils==0.1.0
urllib3==1.26.7
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/bootstrap/bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ InsertOneTuple(Oid objectid)
HeapTupleSetOid(tuple, objectid);

if (IsYugaByteEnabled())
YBCExecuteInsert(boot_reldesc, tupDesc, tuple, false /* use_async_flush */);
YBCExecuteInsert(boot_reldesc, tupDesc, tuple);
else
simple_heap_insert(boot_reldesc, tuple);

Expand Down
18 changes: 4 additions & 14 deletions src/postgres/src/backend/catalog/indexing.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,14 @@ YBCatalogTupleInsert(Relation heapRel, HeapTuple tup, bool yb_shared_insert)
*/
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(dboid,
heapRel,
RelationGetDescr(heapRel),
tup,
false /* use_async_flush */);
YBCExecuteInsertForDb(dboid, heapRel, RelationGetDescr(heapRel), tup);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(YBCGetDatabaseOid(heapRel),
heapRel,
RelationGetDescr(heapRel),
tup,
false /* use_async_flush */);
tup);
/* Update the local cache automatically */
YBSetSysCacheTuple(heapRel, tup);
}
Expand Down Expand Up @@ -364,19 +359,14 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
*/
if (dboid == YBCGetDatabaseOid(heapRel))
continue; /* Will be done after the loop. */
YBCExecuteInsertForDb(dboid,
heapRel,
RelationGetDescr(heapRel),
tup,
false /* use_async_flush */);
YBCExecuteInsertForDb(dboid, heapRel, RelationGetDescr(heapRel), tup);
}
YB_FOR_EACH_DB_END;
}
oid = YBCExecuteInsertForDb(YBCGetDatabaseOid(heapRel),
heapRel,
RelationGetDescr(heapRel),
tup,
false /* use_async_flush */);
tup);
/* Update the local cache automatically */
YBSetSysCacheTuple(heapRel, tup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
ereport(LOG,
(errmsg("%s: incrementing master catalog version (%sbreaking)",
__func__, is_breaking_change ? "" : "non")));
HandleYBStatus(YBCPgDmlExecWriteOp(update_stmt,
&rows_affected_count,
false /* use_async_flush */));
HandleYBStatus(YBCPgDmlExecWriteOp(update_stmt, &rows_affected_count));
Assert(rows_affected_count == 1);

/* Cleanup. */
Expand Down
7 changes: 2 additions & 5 deletions src/postgres/src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
#include "pg_yb_utils.h"
#include "executor/ybcModifyTable.h"

bool yb_use_async_flush = true;


#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
Expand Down Expand Up @@ -3005,10 +3005,7 @@ CopyFrom(CopyState cstate)
}
else
{
YBCExecuteInsert(resultRelInfo->ri_RelationDesc,
tupDesc,
tuple,
yb_use_async_flush);
YBCExecuteInsert(resultRelInfo->ri_RelationDesc, tupDesc, tuple);
}
}
else if (resultRelInfo->ri_FdwRoutine != NULL)
Expand Down
5 changes: 1 addition & 4 deletions src/postgres/src/backend/commands/createas.c
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)

if (IsYBRelation(myState->rel))
{
YBCExecuteInsert(myState->rel,
RelationGetDescr(myState->rel),
tuple,
false /* use_async_flush */);
YBCExecuteInsert(myState->rel, RelationGetDescr(myState->rel), tuple);
}
else
{
Expand Down
5 changes: 1 addition & 4 deletions src/postgres/src/backend/commands/matview.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
tuple = ExecMaterializeSlot(slot);
if (IsYBRelation(myState->transientrel))
{
YBCExecuteInsert(myState->transientrel,
RelationGetDescr(myState->transientrel),
tuple,
false /* use_async_flush */);
YBCExecuteInsert(myState->transientrel, RelationGetDescr(myState->transientrel), tuple);
}
else
{
Expand Down
5 changes: 1 addition & 4 deletions src/postgres/src/backend/commands/tablecmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -7411,10 +7411,7 @@ YBCopyTableRowsUnchecked(Relation oldrel, Relation newrel, AttrNumber* attmap)
ExecStoreHeapTuple(tuple, newslot, false);

/* Write the tuple out to the new relation */
YBCExecuteInsert(newrel,
newslot->tts_tupleDescriptor,
tuple,
false /* use_async_flush */);
YBCExecuteInsert(newrel, newslot->tts_tupleDescriptor, tuple);

MemoryContextReset(per_tup_cxt);

Expand Down
20 changes: 6 additions & 14 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,7 @@ YBCDropTable(Oid relationId)
{
HandleYBStatusIgnoreNotFound(YBCPgDmlBindTable(handle), &not_found);
int rows_affected_count = 0;
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */),
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, &rows_affected_count),
&not_found);
}
}
Expand Down Expand Up @@ -716,13 +714,11 @@ YBCTruncateTable(Relation rel) {
/* Create table-level tombstone for colocated tables / tables in tablegroups */
HandleYBStatus(YBCPgNewTruncateColocated(databaseId,
relationId,
false /* is_single_row_txn */,
false,
&handle));
HandleYBStatus(YBCPgDmlBindTable(handle));
int rows_affected_count = 0;
HandleYBStatus(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */));
HandleYBStatus(YBCPgDmlExecWriteOp(handle, &rows_affected_count));
}
else
{
Expand Down Expand Up @@ -761,13 +757,11 @@ YBCTruncateTable(Relation rel) {
/* Create index-level tombstone for colocated indexes / indexes in tablegroups */
HandleYBStatus(YBCPgNewTruncateColocated(databaseId,
indexId,
false /* is_single_row_txn */,
false,
&handle));
HandleYBStatus(YBCPgDmlBindTable(handle));
int rows_affected_count = 0;
HandleYBStatus(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */));
HandleYBStatus(YBCPgDmlExecWriteOp(handle, &rows_affected_count));
}
else
{
Expand Down Expand Up @@ -1258,9 +1252,7 @@ YBCDropIndex(Oid relationId)
if (valid_handle) {
HandleYBStatusIgnoreNotFound(YBCPgDmlBindTable(handle), &not_found);
int rows_affected_count = 0;
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle,
&rows_affected_count,
false /* use_async_flush */),
HandleYBStatusIgnoreNotFound(YBCPgDmlExecWriteOp(handle, &rows_affected_count),
&not_found);
}
}
Expand Down
Loading

0 comments on commit 7ca347b

Please sign in to comment.