Skip to content

Commit

Permalink
[#10594] YSQL: Add sub-query AfterTrigger block for constraint checki…
Browse files Browse the repository at this point in the history
…ng in CopyFrom operation

Summary:
Previously, when inserting rows via a COPY operation, one adds checking intent that checks
for errors during the copy block, but only invokes the event after all rows have been copied. This
results in invalid rows could be inserted to the table even though they fail the constraint check.

To resolve the issue, we add sub-query block AfterTriggerBeginQuery() and AfterTriggerEndQuery() for
each batch of data being copied to the table. AfterTriggerEndQuery() will invoke the constraint
checking for the current batch, and fail that batch if the constraint check fails and these rows
will not be inserted to the table.

In addition, per @dmitry and @mihnea 's suggestion, we disable batching if the table contains non-FK trigger. This makes the table able to rollback if triggers are fired and caused copying to stop.

However, it's possible that the prior batches are valid and get inserted to the table. Consider the
following example:

Input file
<valid row 1>
<valid row 2>
<valid row 3>
<invalid row>
<invalid row>

If ROWS_PER_TRANSACTION=2, the constraint checking will successfully process and insert batch #1,
and fail batch #2. As such, the first two rows will be inserted to the table.

Test Plan:
ybd --java-test org.yb.pgsql.TestBatchCopyFrom

Run a copy test manually (with the example shown in the Github ticket). All (successful and failure)
should pass.

Reviewers: smishra, jason, alex, mihnea, ena, dmitry

Reviewed By: dmitry

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D15649
  • Loading branch information
lnguyen-yugabyte committed Mar 23, 2022
1 parent d2c6be0 commit 7c428da
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 19 deletions.
159 changes: 155 additions & 4 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ public void testStatementLevelTriggersWithBatchCopyFailure() throws Exception {
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)",
tableName, absFilePath, batchSize),
INVALID_COPY_INPUT_ERROR_MSG);
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalValidLines);
// The copy will happen in one batch, hence none of the rows will be copied
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0);
assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 0);

// test before-statement trigger
Expand All @@ -262,9 +263,9 @@ public void testStatementLevelTriggersWithBatchCopyFailure() throws Exception {
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)",
tableName, absFilePath, batchSize),
INVALID_COPY_INPUT_ERROR_MSG);
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalValidLines * 2);
// note, before statement-level trigger will execute even if COPY FROM fails
assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 1);
// The copy will happen in one batch, hence none of the rows will be copied
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0);
assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 0);
}
}

Expand Down Expand Up @@ -583,4 +584,154 @@ public void testBatchedCopyForPartitionedTables() throws Exception {
}
}
}

@Test
public void testBatchedCopyValidForeignKeyCheck() throws Exception {
String absFilePath = getAbsFilePath("fk-copyfrom.txt");
String refTableName = "reftable_ok";
String tableName = "maintable_ok";

int totalLines = 100;
int batchSize = totalLines;

createFileInTmpDir(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
// Both reference table and main table have the same key set from 0 to totalLines - 1.
statement.execute(String.format("CREATE TABLE %s (a INT PRIMARY KEY)", refTableName));
statement.execute(String.format(
"INSERT INTO %s (a) SELECT s * 4 FROM GENERATE_SERIES (0, %d) AS s",
refTableName, totalLines - 1));

statement.execute(
String.format("CREATE TABLE %s (a INT REFERENCES %s, b INT, c INT, d INT)",
tableName, refTableName));
statement.execute(
String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)",
tableName, absFilePath, batchSize));

assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines);
}
}

@Test
public void testBatchedCopyFailedOnForeignKeyCheck() throws Exception {
String absFilePath = getAbsFilePath("fk-copyfrom-all-failure.txt");
String refTableName = "reftable_failed";
String tableName = "maintable_failed";

int totalLines = 100;
int batchSize = totalLines;
String referenceKey = "a_fkey";

createFileInTmpDir(absFilePath, totalLines);

String INVALID_FOREIGN_KEY_CHECK_ERROR_MSG =
String.format("insert or update on table \"%s\" violates foreign key constraint \"%s_%s\"",
tableName, tableName, referenceKey);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a INT PRIMARY KEY)", refTableName));
// Create reference table without the (a = 0) line.
statement.execute(String.format(
"INSERT INTO %s (a) SELECT s * 4 FROM GENERATE_SERIES (1, %d) AS s",
refTableName, totalLines - 1));

statement.execute(
String.format("CREATE TABLE %s (a INT REFERENCES %s, b INT, c INT, d INT)",
tableName, refTableName));

// The execution will fail since the (a = 0) key is not present in the reference table.
runInvalidQuery(statement,
String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)",
tableName, absFilePath, batchSize),
INVALID_FOREIGN_KEY_CHECK_ERROR_MSG);

// No rows should be copied.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0);
}
}

@Test
public void testBatchedCopyPartialFailureOnForeignKeyCheck() throws Exception {
String absFilePath = getAbsFilePath("fk-copyfrom-partial-failure.txt");
String refTableName = "reftable_partial";
String tableName = "maintable_partial";

int totalLines = 100;
int batchSize = 1;
String referenceKey = "a_fkey";

createFileInTmpDir(absFilePath, totalLines);

String INVALID_FOREIGN_KEY_CHECK_ERROR_MSG =
String.format("insert or update on table \"%s\" violates foreign key constraint \"%s_%s\"",
tableName, tableName, referenceKey);

try (Statement statement = connection.createStatement()) {
// Create reference table with half of the lines.
statement.execute(String.format("CREATE TABLE %s (a INT PRIMARY KEY)", refTableName));
statement.execute(String.format(
"INSERT INTO %s (a) SELECT s * 4 FROM GENERATE_SERIES (0, %d) AS s",
refTableName, totalLines/2 - 1));

statement.execute(
String.format("CREATE TABLE %s (a INT REFERENCES %s, b INT, c INT, d INT)",
tableName, refTableName));

// The execution will throw error since the later half is not present in the reference table.
runInvalidQuery(statement,
String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)",
tableName, absFilePath, batchSize),
INVALID_FOREIGN_KEY_CHECK_ERROR_MSG);

// However, we should be able to copy up to totalLines / 2 lines
// that are present in the reference table.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, totalLines / 2);
}
}

@Test
public void testBatchedCopyManualTrigger() throws Exception {
String absFilePath = getAbsFilePath("manual-trigger.txt");
String tableName = "manual_trigger_table";

int totalLines = 100;

// The batch size will be ignored, since there is a manually created trigger.
int batchSize = 5;
String INVALID_PRIMARY_KEY_TRIGGER_ERROR_MSG = "Primary key too large";

createFileInTmpDir(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(
String.format("CREATE TABLE %s (a INT PRIMARY KEY, b INT, c INT, d INT)", tableName));

// This trigger will fire since the row will eventually exceed the limit.
statement.execute(
String.format(
"CREATE FUNCTION log_a() RETURNS TRIGGER AS $$ " +
"BEGIN " +
"IF (NEW.a > %d) THEN RAISE EXCEPTION '%s'; " +
"END IF; " +
"RETURN NEW; " +
"END; " +
"$$ LANGUAGE plpgsql;", totalLines / 2, INVALID_PRIMARY_KEY_TRIGGER_ERROR_MSG));

statement.execute(
String.format(
"CREATE TRIGGER mytrigger BEFORE INSERT OR UPDATE ON %s " +
"FOR EACH ROW EXECUTE PROCEDURE log_a()", tableName));

// The execution will throw error on the primary key trigger.
runInvalidQuery(statement,
String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %d)",
tableName, absFilePath, batchSize),
INVALID_PRIMARY_KEY_TRIGGER_ERROR_MSG);

// We should roll back all the changes.
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0);
}
}
}
41 changes: 27 additions & 14 deletions src/postgres/src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,6 @@ CopyFrom(CopyState cstate)
int nBufferedTuples = 0;
int prev_leaf_part_index = -1;
bool useNonTxnInsert;
bool isBatchTxnCopy;

/*
* If the batch size is not explicitly set in the query by the user,
Expand All @@ -2404,7 +2403,6 @@ CopyFrom(CopyState cstate)
{
cstate->batch_size = yb_default_copy_from_rows_per_transaction;
}
isBatchTxnCopy = cstate->batch_size > 0;

#define MAX_BUFFERED_TUPLES 1000
HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
Expand Down Expand Up @@ -2636,7 +2634,7 @@ CopyFrom(CopyState cstate)
ExecSetupChildParentMapForLeaf(proute);
}

if (isBatchTxnCopy)
if (cstate->batch_size > 0)
{
/*
* Batched copy is not supported
Expand Down Expand Up @@ -2665,11 +2663,16 @@ CopyFrom(CopyState cstate)
errhint("Either run this COPY outside of a transaction block or set "
"rows_per_transaction option to `0` to disable batching and "
"remove this warning.")));

else
{
else if (HasNonRITrigger(cstate->rel->trigdesc))
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Batched COPY is not supported on table with non RI trigger. "
"Defaulting to using one transaction for the entire copy."),
errhint("Set rows_per_transaction option to `0` to disable batching "
"and remove this warning.")));
else
batch_size = cstate->batch_size;
}

cstate->batch_size = batch_size;
}

Expand Down Expand Up @@ -2750,7 +2753,7 @@ CopyFrom(CopyState cstate)

bool has_more_tuples = true;
while (has_more_tuples)
{
{
/*
* When batch size is not provided from the query option,
* default behavior is to read each line from the file
Expand Down Expand Up @@ -3073,15 +3076,25 @@ CopyFrom(CopyState cstate)
if (IsYBRelation(cstate->rel))
ResetPerTupleExprContext(estate);
}
/*
* Commit transaction per batch.
* When CopyFrom method is called, we are already inside a transaction block
* and relevant transaction state properties have been previously set.
*/
if (isBatchTxnCopy)

if (cstate->batch_size > 0)
{
/*
* Handle queued AFTER triggers before committing. If there are errors,
* do not commit the current batch.
*/
AfterTriggerEndQuery(estate);

/*
* Commit transaction per batch.
* When CopyFrom method is called, we are already inside a transaction block
* and relevant transaction state properties have been previously set.
*/
YBCCommitTransaction();
YBInitializeTransaction();

/* Start a new AFTER trigger */
AfterTriggerBeginQuery();
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/postgres/src/backend/utils/adt/ri_triggers.c
Original file line number Diff line number Diff line change
Expand Up @@ -3315,3 +3315,17 @@ YbAddTriggerFKReferenceIntent(Trigger *trigger, Relation fk_rel, HeapTuple new_r
pfree(descr);
}
}

/*
* Check if a trigger description contains any non RI trigger.
*/
bool
HasNonRITrigger(const TriggerDesc* trigDesc)
{
for (int i = trigDesc ? trigDesc->numtriggers : 0; i > 0; i--)
{
if (RI_FKey_trigger_type(trigDesc->triggers[i - 1].tgfoid) == RI_TRIGGER_NONE)
return true;
}
return false;
}
3 changes: 3 additions & 0 deletions src/postgres/src/include/commands/trigger.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,7 @@ extern void YbAddTriggerFKReferenceIntent(Trigger *trigger, Relation fk_rel, Hea

extern int RI_FKey_trigger_type(Oid tgfoid);

/* Return true if the trigger description has non FK trigger. */
extern bool HasNonRITrigger(const TriggerDesc* trigDesc);

#endif /* TRIGGER_H */
20 changes: 20 additions & 0 deletions src/postgres/src/test/regress/expected/yb_pg_triggers.out
Original file line number Diff line number Diff line change
Expand Up @@ -2874,6 +2874,26 @@ drop table self_ref;
drop function dump_insert();
drop function dump_update();
drop function dump_delete();
--
-- Rows per transaction should be disabled if table contains non Referential Integrity triggers.
--
CREATE TABLE tbl(k INT PRIMARY KEY);
CREATE TABLE shadow_tbl(k INT PRIMARY KEY);
CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$
BEGIN
INSERT INTO shadow_tbl VALUES(NEW.k);
RETURN NEW;
END; $$
LANGUAGE 'plpgsql';
CREATE TRIGGER tbl_insert_trigger AFTER INSERT
ON tbl FOR EACH ROW EXECUTE PROCEDURE trigger_func();
-- A warning should be shown disabling the ROWS_PER_TRANSACTION value.
COPY tbl FROM STDIN WITH (ROWS_PER_TRANSACTION 2);
WARNING: Batched COPY is not supported on table with non RI trigger. Defaulting to using one transaction for the entire copy.
HINT: Set rows_per_transaction option to `0` to disable batching and remove this warning.
DROP TRIGGER tbl_insert_trigger ON tbl;
DROP TABLE shadow_tbl;
DROP TABLE tbl;
-- Leave around some objects for other tests
create table trigger_parted (a int primary key) partition by list (a);
create function trigger_parted_trigfunc() returns trigger language plpgsql as
Expand Down
31 changes: 30 additions & 1 deletion src/postgres/src/test/regress/sql/yb_pg_triggers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2176,6 +2176,36 @@ drop function dump_insert();
drop function dump_update();
drop function dump_delete();

--
-- Rows per transaction should be disabled if table contains non Referential Integrity triggers.
--
CREATE TABLE tbl(k INT PRIMARY KEY);
CREATE TABLE shadow_tbl(k INT PRIMARY KEY);

CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$
BEGIN
INSERT INTO shadow_tbl VALUES(NEW.k);
RETURN NEW;
END; $$
LANGUAGE 'plpgsql';

CREATE TRIGGER tbl_insert_trigger AFTER INSERT
ON tbl FOR EACH ROW EXECUTE PROCEDURE trigger_func();

-- A warning should be shown disabling the ROWS_PER_TRANSACTION value.
COPY tbl FROM STDIN WITH (ROWS_PER_TRANSACTION 2);
1
2
3
4
5
6
\.

DROP TRIGGER tbl_insert_trigger ON tbl;
DROP TABLE shadow_tbl;
DROP TABLE tbl;

-- Leave around some objects for other tests
create table trigger_parted (a int primary key) partition by list (a);
create function trigger_parted_trigfunc() returns trigger language plpgsql as
Expand All @@ -2190,4 +2220,3 @@ create table trigger_parted_p2 partition of trigger_parted for values in (2)
create table trigger_parted_p2_2 partition of trigger_parted_p2 for values in (2);
alter table only trigger_parted_p2 disable trigger aft_row;
alter table trigger_parted_p2_2 enable always trigger aft_row;

0 comments on commit 7c428da

Please sign in to comment.